feat(rag): fb-41 PR-3b-ii ScriptedLm + multi-hop tests + refusal hop trace #170

Merged
altair823 merged 3 commits from feat/fb-41-pr-3b-ii-scripted-lm-tests into main 2026-05-25 08:25:44 +00:00
5 changed files with 761 additions and 41 deletions

View File

@@ -189,9 +189,13 @@ pub struct RagCfg {
#[serde(default = "default_multi_hop_max_depth")]
pub multi_hop_max_depth: u32,
/// p9-fb-41: cap on how many sub-queries the LLM may emit in a
/// single decompose / decide call. Mirrors
/// [`MULTI_HOP_MAX_SUB_QUERIES_DEFAULT`] in kebab-rag — the
/// const is the hard floor while this is the runtime knob.
/// single decompose / decide call. This is the *prompt-side
/// soft hint* — the value the pipeline injects into the
/// decompose / decide prompts so the LLM knows what to aim for.
/// kebab-rag enforces a separate compile-time hard ceiling
/// (`MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP`, currently 10) as a
/// safety net against misbehaving models — if you raise this
/// knob above the hard cap, bump the const in the same PR.
/// Default `5`.
#[serde(default = "default_multi_hop_max_sub_queries_per_iter")]
pub multi_hop_max_sub_queries_per_iter: u32,

View File

@@ -79,8 +79,18 @@ pub struct HopRecord {
/// the final iter is the synthesize call.
pub iter: u32,
pub kind: HopKind,
/// Sub-queries the LLM emitted at this iter. For the synthesize
/// hop this is empty (no sub-queries — just the final answer).
/// Sub-queries associated with this hop. The meaning depends on
/// `kind`:
///
/// - [`HopKind::Decompose`]: the initial sub-queries the LLM
/// broke the original user query into. These drive the
/// `iter=1` retrieval round.
/// - [`HopKind::Decide`]: the *new* sub-queries the LLM
/// emitted to drive the next retrieval round. Empty when the
/// LLM signalled stop OR when `forced_stop = true` (cap hit
/// or parse-degraded).
/// - [`HopKind::Synthesize`]: always empty — the final hop
/// produces the user-visible answer, not more sub-queries.
#[serde(default)]
pub sub_queries: Vec<String>,
/// Number of *new* chunks the retrieval round contributed to the
@@ -98,6 +108,13 @@ pub struct HopRecord {
/// Wall-clock latency of the LLM call for this hop, in
/// milliseconds. Useful for cost / latency analysis when a
/// `kebab eval` run records `Answer.hops`.
///
/// `0` is overloaded: it means "no LLM call happened at this
/// hop" when (a) the hop was a Decide skipped due to
/// `forced_stop` (depth-cap or pool-cap fired before the LLM
/// was asked) or (b) the pool was empty before any decide
/// could run. Treat `0` as "absent or instantaneous" rather
/// than as a genuine measurement.
pub llm_call_ms: u32,
}

View File

@@ -309,10 +309,10 @@ impl RagPipeline {
// ── 2. Score gate ──────────────────────────────────────────────────
if hits.is_empty() {
return self.refuse_no_chunks(query, &opts, k_effective, started);
return self.refuse_no_chunks(query, &opts, k_effective, started, None);
}
if top_score < self.config.rag.score_gate {
return self.refuse_score_gate(query, &opts, &hits, k_effective, started);
return self.refuse_score_gate(query, &opts, &hits, k_effective, started, None);
}
// ── 3. Pack context ────────────────────────────────────────────────
@@ -330,7 +330,7 @@ impl RagPipeline {
"kb-rag: all retrieved chunks were unfetchable from the store; \
falling back to NoChunks refusal"
);
return self.refuse_no_chunks(query, &opts, k_effective, started);
return self.refuse_no_chunks(query, &opts, k_effective, started, None);
}
// ── 4. Render prompt ───────────────────────────────────────────────
@@ -697,6 +697,16 @@ impl RagPipeline {
if forced_stop || pool.is_empty() {
(Vec::new(), 0)
} else {
// Snippet-based preview: each pool entry contributes
// its `SearchHit.snippet` (already truncated upstream
// by the retriever). `max_pool_chunks` acts as the
// implicit cap on this string's length — the loop
// breaks before we accumulate more pool entries.
// We intentionally do NOT route this through
// `pack_context` (no full chunk text fetch, no
// marker numbering): decide only needs gist to
// judge sufficiency, and full text is reserved for
// the terminal synthesize call.
let preview = pool
.iter()
.enumerate()
@@ -711,13 +721,14 @@ impl RagPipeline {
depth_remaining,
&opts,
)?;
match decide_result {
Some(qs) if !qs.is_empty() => (qs, ms),
// Empty array OR parse failure → stop signal.
// Parse failure is NOT a refusal — graceful
// degrade to early synthesize (per spec §9).
_ => (Vec::new(), ms),
}
// `parse_decompose_response` post-condition: when
// it returns `Some(qs)`, `qs` is guaranteed
// non-empty (and trimmed + hard-capped). `None`
// covers both "parse failure" and "empty array
// after trim" — both mean stop. Parse failure is
// NOT a refusal here (spec §9 — graceful degrade
// to early synthesize on the decide hop only).
(decide_result.unwrap_or_default(), ms)
};
hops.push(HopRecord {
@@ -756,11 +767,27 @@ impl RagPipeline {
let top_score = pool.first().map(|h| h.retrieval.fusion_score).unwrap_or(0.0);
// ── 3. Score gate / no chunks ──────────────────────────────────────
// PR-3b-ii: forward the partial hop trace into the refusal so
// a `--multi-hop` user can still see which decompose / decide
// signals fired before the score-gate / no-chunks bailout.
if pool.is_empty() {
return self.refuse_no_chunks(query, &opts, k_effective, started);
return self.refuse_no_chunks(
query,
&opts,
k_effective,
started,
Some(hops),
);
}
if top_score < self.config.rag.score_gate {
return self.refuse_score_gate(query, &opts, &pool, k_effective, started);
return self.refuse_score_gate(
query,
&opts,
&pool,
k_effective,
started,
Some(hops),
);
}
// ── 4. Pack context ────────────────────────────────────────────────
@@ -772,7 +799,13 @@ impl RagPipeline {
pool_size = pool.len(),
"kb-rag: multi-hop pool chunks all unfetchable; falling back to NoChunks"
);
return self.refuse_no_chunks(query, &opts, k_effective, started);
return self.refuse_no_chunks(
query,
&opts,
k_effective,
started,
Some(hops),
);
}
// ── 5. Synthesize prompt ───────────────────────────────────────────
@@ -1262,12 +1295,20 @@ impl RagPipeline {
/// Refusal path for empty hits — `RefusalReason::NoChunks`. No LLM
/// call. The persisted row records `chunks_returned = 0`.
///
/// `hops` is `None` on the single-pass path; the multi-hop path
/// (PR-3b-ii) forwards the partial hop trace accumulated up to
/// the refusal point so a `--multi-hop` user can still see which
/// decompose / decide signals fired before retrieval came up
/// empty. The trace is wire-additive (`Answer.hops` already
/// `skip_serializing_if = None`).
fn refuse_no_chunks(
&self,
query: &str,
opts: &AskOpts,
k_effective: usize,
started: std::time::Instant,
hops: Option<Vec<HopRecord>>,
) -> Result<Answer> {
let trace_id = mint_trace_id(query, 0.0, &self.llm.model_ref().id);
let elapsed_ms = u32::try_from(started.elapsed().as_millis()).unwrap_or(u32::MAX);
@@ -1298,11 +1339,12 @@ impl RagPipeline {
created_at: OffsetDateTime::now_utc(),
conversation_id: opts.conversation_id.clone(),
turn_index: opts.turn_index,
// p9-fb-41 Step 2 of PR-3: every Answer literal carries
// `hops`. Single-pass + refusal paths leave it `None`;
// only the multi-hop happy path will set `Some(...)` in
// Step 5 once the decide loop populates a hop trace.
hops: None,
// p9-fb-41 PR-3b-ii: single-pass callers pass `None`;
// `ask_multi_hop` forwards the partial hop trace it
// built up to the refusal point. Either way `Answer.hops`
// stays `skip_serializing_if = None`, so single-pass
// wire output is unchanged.
hops,
};
if let Err(e) = self.docs.put_answer(&answer, query, None) {
tracing::warn!(target: "kebab-rag", error = %e, "kb-rag: put_answer (NoChunks) failed");
@@ -1313,6 +1355,11 @@ impl RagPipeline {
/// Refusal path for top-1 below the gate — `RefusalReason::ScoreGate`.
/// No LLM call. Lists up to three near-miss candidates verbatim in
/// `answer` so the user gets actionable context.
///
/// `hops` is `None` on the single-pass path; the multi-hop path
/// (PR-3b-ii) forwards the partial hop trace accumulated before
/// the gate refusal. See [`Self::refuse_no_chunks`] for the
/// shared rationale.
fn refuse_score_gate(
&self,
query: &str,
@@ -1320,6 +1367,7 @@ impl RagPipeline {
hits: &[SearchHit],
k_effective: usize,
started: std::time::Instant,
hops: Option<Vec<HopRecord>>,
) -> Result<Answer> {
let top_score = hits[0].retrieval.fusion_score;
let gate = self.config.rag.score_gate;
@@ -1385,11 +1433,8 @@ impl RagPipeline {
created_at: OffsetDateTime::now_utc(),
conversation_id: opts.conversation_id.clone(),
turn_index: opts.turn_index,
// p9-fb-41 Step 2 of PR-3: every Answer literal carries
// `hops`. Single-pass + refusal paths leave it `None`;
// only the multi-hop happy path will set `Some(...)` in
// Step 5 once the decide loop populates a hop trace.
hops: None,
// p9-fb-41 PR-3b-ii: see refuse_no_chunks' identical comment.
hops,
};
if let Err(e) = self.docs.put_answer(&answer, query, None) {
tracing::warn!(target: "kebab-rag", error = %e, "kb-rag: put_answer (ScoreGate) failed");
@@ -1443,11 +1488,27 @@ fn compute_stale(
/// (design §9) can tell the two paths apart in `eval_runs.config_snapshot_json`.
pub(crate) const PROMPT_TEMPLATE_VERSION_MULTI_HOP: &str = "rag-multi-hop-v1";
/// Max sub-questions the decompose call may emit per iteration. PR-2
/// ships with a fixed depth=2 pipeline (decompose once + synthesize),
/// so this cap acts on the single decompose call. PR-3 (dynamic iter)
/// will reuse the same cap for the per-iter decide call too.
pub(crate) const MULTI_HOP_MAX_SUB_QUERIES_DEFAULT: usize = 5;
/// Hard parse-side cap on how many sub-question strings
/// [`parse_decompose_response`] will accept from a single LLM response,
/// regardless of `RagCfg.multi_hop_max_sub_queries_per_iter`.
///
/// This is intentionally a *defensive* compile-time cap, not the
/// user-tunable knob:
///
/// - `RagCfg.multi_hop_max_sub_queries_per_iter` (default 5) is the
/// *prompt-side soft hint* — the value the pipeline injects into the
/// decompose / decide prompts so the LLM knows what to aim for.
/// Users can raise it via config / env.
/// - `MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP` (this const) is the
/// *parse-side hard ceiling* — a misbehaving model that emits 100
/// sub-questions gets cropped here so the retrieve loop does not
/// spawn an unbounded number of search calls per iter.
///
/// In practice this is generous (the soft hint is 5 by default, the
/// hard cap is 10) so user-tunable expansion to ~10 stays unaffected.
/// If a future config knob exceeds this number, raise the const in
/// the same PR.
pub(crate) const MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP: usize = 10;
const MULTI_HOP_DECOMPOSE_SYSTEM_PROMPT: &str = "당신은 사용자의 질문을 다단계 검색에 필요한 sub-question 들로 분해하는 도구다.\n- multi-hop 정보가 필요한 경우 독립적으로 검색 가능한 sub-question 들로 분해한다.\n- 각 sub-question 은 자기 자신만으로 의미가 통해야 한다 (대명사 / \"위 답변\" 같은 reference 금지).\n- 원본이 이미 단순하면 원본 그대로 1 개만 반환한다.\n- 응답은 JSON array of strings 만 출력한다. 다른 prose / markdown fence / 설명 금지.";
@@ -1590,13 +1651,21 @@ fn mint_trace_id(query: &str, top_score: f32, model_id: &str) -> TraceId {
/// into a vector of sub-question strings. Strips a leading markdown
/// code-fence (```json ... ```), then deserializes as a JSON array of
/// strings, then trims each entry + drops empties + caps at
/// [`MULTI_HOP_MAX_SUB_QUERIES_DEFAULT`].
/// [`MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP`].
///
/// Returns `None` when:
/// - parse fails outright (not a JSON array of strings),
/// - the array deserializes but is empty after trim/drop,
///
/// in which case the caller surfaces `RefusalReason::MultiHopDecomposeFailed`.
/// in which case the caller surfaces `RefusalReason::MultiHopDecomposeFailed`
/// (for the initial decompose hop) or treats the signal as
/// "synthesize now" (for the decide hop — see
/// [`RagPipeline::multi_hop_decide`]).
///
/// `Some(non_empty)` is the only success shape: the post-conditions
/// guarantee at least one trimmed non-empty entry, capped at
/// [`MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP`]. Callers therefore do
/// not need a defensive `if !qs.is_empty()` guard.
fn parse_decompose_response(raw: &str) -> Option<Vec<String>> {
let stripped = strip_markdown_json_fence(raw.trim());
let arr: Vec<String> = serde_json::from_str(stripped).ok()?;
@@ -1604,7 +1673,7 @@ fn parse_decompose_response(raw: &str) -> Option<Vec<String>> {
.into_iter()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.take(MULTI_HOP_MAX_SUB_QUERIES_DEFAULT)
.take(MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP)
.collect();
if cleaned.is_empty() {
None
@@ -1696,12 +1765,15 @@ mod tests {
}
#[test]
fn parse_decompose_response_caps_at_max_sub_queries() {
// 7 entries; cap is MULTI_HOP_MAX_SUB_QUERIES_DEFAULT (=5).
let raw = r#"["a","b","c","d","e","f","g"]"#;
fn parse_decompose_response_caps_at_hard_cap() {
// 12 entries; parse-side hard cap = 10. Pins the cap and the
// truncation order (first-N keep, tail drop) so a future
// refactor can't accidentally relax the safety ceiling or
// re-order the take/filter chain.
let raw = r#"["a","b","c","d","e","f","g","h","i","j","k","l"]"#;
let out = parse_decompose_response(raw).unwrap();
assert_eq!(out.len(), MULTI_HOP_MAX_SUB_QUERIES_DEFAULT);
assert_eq!(out, vec!["a", "b", "c", "d", "e"]);
assert_eq!(out.len(), MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP);
assert_eq!(out, vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]);
}
#[test]

View File

@@ -206,6 +206,47 @@ impl Retriever for MockRetriever {
}
}
/// p9-fb-41 PR-3b-ii: scripted retriever. Returns a different
/// `Vec<SearchHit>` per `search` call from a pre-supplied sequence,
/// so a multi-hop test can simulate "iter 1 returns chunk A, iter 2
/// returns chunks A+B" (pool dedup) or "different sub-queries hit
/// different docs". Exhaustion returns an empty `Vec` (no panic) —
/// the pipeline already handles "no hits this round" gracefully via
/// the dedup loop, and a panic would conflate "test forgot a row"
/// with "pipeline made an unexpected extra retrieval call".
///
/// Use [`ScriptedRetriever::calls`] to assert the expected number
/// of retrievals occurred.
pub struct ScriptedRetriever {
hits_per_call: Vec<Vec<SearchHit>>,
next: std::sync::atomic::AtomicUsize,
}
impl ScriptedRetriever {
pub fn new(hits_per_call: Vec<Vec<SearchHit>>) -> Self {
Self {
hits_per_call,
next: std::sync::atomic::AtomicUsize::new(0),
}
}
pub fn calls(&self) -> usize {
self.next.load(std::sync::atomic::Ordering::SeqCst)
}
}
impl Retriever for ScriptedRetriever {
fn search(&self, _q: &SearchQuery) -> anyhow::Result<Vec<SearchHit>> {
let idx = self
.next
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(self.hits_per_call.get(idx).cloned().unwrap_or_default())
}
fn index_version(&self) -> IndexVersion {
IndexVersion("test-iv".to_string())
}
}
/// Pad a short prefix to the 32-hex shape `kebab_core` newtypes expect.
pub fn id32(prefix: &str) -> String {
let mut s = prefix.to_string();
@@ -215,3 +256,131 @@ pub fn id32(prefix: &str) -> String {
s.truncate(32);
s
}
/// p9-fb-41 PR-3b-ii: scripted language model. Returns a different
/// canned response per `generate_stream` call from a pre-supplied
/// `Vec<String>`. Mirrors `MockLanguageModel`'s streaming contract
/// (one `TokenChunk::Token` per Unicode scalar, terminal `Done` with
/// `canned_usage`, stop-string truncation honoured) but lets a test
/// distinguish the decompose / per-iter decide / synthesize LLM calls
/// of `RagPipeline::ask_multi_hop` — each can return a different
/// payload (`["q1","q2"]`, `[]`, `"final answer [#1]"`, etc.).
///
/// Internally `Vec<String>` (immutable after construction) plus an
/// `AtomicUsize` index counter, so the type is `Send + Sync` and
/// tests wrap it in `Arc::new(ScriptedLm::new(...))` to share with
/// the pipeline. Tests can read `calls()` for an assertion on the
/// expected LLM call count.
///
/// Exhaustion (more calls than scripted responses) panics — tests
/// that need an "infinite" final response can supply a longer
/// script; the panic message names the call index so the test
/// failure points at the missing entry.
pub struct ScriptedLm {
model_id: String,
provider: String,
context_tokens: usize,
/// Canned responses in call order. Index `i` is returned on the
/// `i`-th `generate_stream` call (0-based).
responses: Vec<String>,
/// 0-based index of the next response to return on `generate_stream`.
next: std::sync::atomic::AtomicUsize,
canned_finish: kebab_core::FinishReason,
canned_usage: kebab_core::TokenUsage,
}
impl ScriptedLm {
/// Build a scripted LM with the default model_id/provider used by
/// the rest of the test suite (`mock-lm` / `mock`) and the
/// MockLanguageModel-equivalent canned usage. No knobs are
/// exposed today — every multi-hop test exercises the pipeline
/// flow, not the LM identity. Add builders only when a test
/// genuinely needs to override defaults.
pub fn new(responses: Vec<&str>) -> Self {
Self {
model_id: "mock-lm".to_string(),
provider: "mock".to_string(),
context_tokens: 32_768,
responses: responses.into_iter().map(str::to_string).collect(),
next: std::sync::atomic::AtomicUsize::new(0),
canned_finish: kebab_core::FinishReason::Stop,
canned_usage: kebab_core::TokenUsage {
prompt_tokens: 10,
completion_tokens: 5,
latency_ms: 7,
},
}
}
/// Total `generate_stream` invocations so far. Tests use this to
/// assert "exactly N LLM calls happened" without scanning the
/// HopRecord trace (the trace is the user-visible signal; this
/// is the lower-level call counter).
pub fn calls(&self) -> usize {
self.next.load(std::sync::atomic::Ordering::SeqCst)
}
/// Earliest byte position of any non-empty stop string in
/// `canned`. Same precedence rule as `MockLanguageModel`:
/// `Iterator::min` returns the first equal element, so ties
/// break by `stop` declaration order. `str::find` returns a
/// UTF-8 char boundary by contract, so the resulting prefix
/// slice is sound.
fn apply_stop<'a>(canned: &'a str, stop: &[String]) -> (&'a str, bool) {
let earliest = stop
.iter()
.filter(|s| !s.is_empty())
.filter_map(|s| canned.find(s.as_str()))
.min();
match earliest {
Some(idx) => (&canned[..idx], true),
None => (canned, false),
}
}
}
impl kebab_core::LanguageModel for ScriptedLm {
fn model_ref(&self) -> kebab_core::ModelRef {
kebab_core::ModelRef {
id: self.model_id.clone(),
provider: self.provider.clone(),
dimensions: None,
}
}
fn context_tokens(&self) -> usize {
self.context_tokens
}
fn generate_stream(
&self,
req: kebab_core::GenerateRequest,
) -> anyhow::Result<
Box<dyn Iterator<Item = anyhow::Result<kebab_core::TokenChunk>> + Send>,
> {
let idx = self
.next
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let canned = self.responses.get(idx).unwrap_or_else(|| {
panic!(
"ScriptedLm exhausted: call #{idx} requested but only {} responses scripted",
self.responses.len()
)
});
let (truncated, stop_hit) = Self::apply_stop(canned, &req.stop);
let mut chunks: Vec<kebab_core::TokenChunk> = truncated
.chars()
.map(|c| kebab_core::TokenChunk::Token(c.to_string()))
.collect();
let finish_reason = if stop_hit {
kebab_core::FinishReason::Stop
} else {
self.canned_finish.clone()
};
chunks.push(kebab_core::TokenChunk::Done {
finish_reason,
usage: self.canned_usage.clone(),
});
Ok(Box::new(chunks.into_iter().map(Ok)))
}
}

View File

@@ -0,0 +1,458 @@
//! p9-fb-41 PR-3b-ii: integration tests for the dynamic multi-hop
//! decide loop in [`RagPipeline::ask_multi_hop`].
//!
//! Each test uses [`ScriptedLm`] to drive a different LLM response
//! per call (decompose → 0..N decide → synthesize) and, where the
//! scenario requires, [`ScriptedRetriever`] to drive different hit
//! lists per retrieval round. The test fixture stays mock-only —
//! no Ollama / fastembed / LanceDB.
//!
//! Coverage:
//!
//! 1. `decide_stop_triggers_synthesize` — decide returns `[]`,
//! pipeline transitions straight to synthesize.
//! 2. `decide_continue_adds_more_chunks` — decide returns
//! `["q2"]`, iter 2 retrieves and grows the pool.
//! 3. `max_depth_force_stops` — `multi_hop_max_depth = 1` forces
//! `forced_stop = true` on the depth-1 decide hop and skips the
//! decide LLM call.
//! 4. `pool_chunks_dedup_by_chunk_id` — two sub-queries return the
//! same chunk; pool dedups by `chunk_id`.
//! 5. `decide_parse_failure_falls_through_to_synthesize` — decide
//! LLM emits non-JSON garbage; pipeline graceful-degrades to
//! synthesize (NOT a refusal).
mod common;
use std::sync::Arc;
use common::{RagEnv, ScriptedLm, ScriptedRetriever, id32, mk_hit};
use kebab_core::{HopKind, LanguageModel, RefusalReason, Retriever, SearchMode};
use kebab_rag::{AskOpts, RagPipeline};
/// Default `AskOpts` for multi-hop tests: deterministic seed,
/// lexical mode (so the test crate doesn't need to wire up an
/// embedder), and `multi_hop: true` to route through
/// `ask_multi_hop`.
fn multi_hop_opts() -> AskOpts {
AskOpts {
k: 5,
explain: false,
mode: SearchMode::Lexical,
temperature: Some(0.0),
seed: Some(0),
stream_sink: None,
history: Vec::new(),
conversation_id: None,
turn_index: None,
multi_hop: true,
}
}
// ── 1. decide returns [] → synthesize immediately ─────────────────────────
#[test]
fn multi_hop_decide_stop_triggers_synthesize() {
let env = RagEnv::new();
let cid = id32("c1");
let did = id32("d1");
env.seed_chunk(&cid, &did, "notes/a.md", "Body text.", &["Intro"]);
let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])];
let retriever = Arc::new(ScriptedRetriever::new(vec![hits]));
let retriever_handle = retriever.clone();
let retriever_dyn: Arc<dyn Retriever> = retriever;
// Three LLM calls in order: decompose → decide → synthesize.
let lm = Arc::new(ScriptedLm::new(vec![
r#"["q1"]"#,
r#"[]"#,
"answer body [#1]",
]));
let lm_handle = lm.clone();
let lm_dyn: Arc<dyn LanguageModel> = lm;
let pipeline =
RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone());
let answer = pipeline.ask("compound", multi_hop_opts()).unwrap();
assert!(answer.grounded, "decide-stop synthesize must be grounded");
assert_eq!(answer.refusal_reason, None);
assert_eq!(
lm_handle.calls(),
3,
"decompose + decide + synthesize = 3 LLM calls"
);
assert_eq!(
retriever_handle.calls(),
1,
"single sub-query → single retrieval"
);
let hops = answer.hops.expect("multi-hop happy path stamps Some(hops)");
assert_eq!(hops.len(), 3, "[Decompose, Decide(stop), Synthesize]");
assert_eq!(hops[0].kind, HopKind::Decompose);
assert_eq!(hops[0].sub_queries, vec!["q1"]);
assert_eq!(hops[1].kind, HopKind::Decide);
assert!(
hops[1].sub_queries.is_empty(),
"decide stop signal → empty sub_queries on the HopRecord"
);
assert!(
!hops[1].forced_stop,
"LLM stop signal is NOT a forced_stop (forced_stop = cap-driven only)"
);
assert_eq!(hops[2].kind, HopKind::Synthesize);
}
// ── 2. decide ["q2"] → iter 2 retrieves and grows the pool ────────────────
#[test]
fn multi_hop_decide_continue_adds_more_chunks() {
let env = RagEnv::new();
let cid1 = id32("c1");
let did1 = id32("d1");
let cid2 = id32("c2");
let did2 = id32("d2");
env.seed_chunk(&cid1, &did1, "notes/a.md", "Chunk one.", &["A"]);
env.seed_chunk(&cid2, &did2, "notes/b.md", "Chunk two.", &["B"]);
// iter 1 retrieves chunk 1; iter 2 retrieves chunk 2 (different
// chunk_id → pool grows).
let retriever = Arc::new(ScriptedRetriever::new(vec![
vec![mk_hit(1, &cid1, &did1, "notes/a.md", 0.85, &["A"])],
vec![mk_hit(1, &cid2, &did2, "notes/b.md", 0.80, &["B"])],
]));
let retriever_handle = retriever.clone();
let retriever_dyn: Arc<dyn Retriever> = retriever;
let lm = Arc::new(ScriptedLm::new(vec![
r#"["q1"]"#,
r#"["q2"]"#,
r#"[]"#,
"synthesized [#1] [#2]",
]));
let lm_handle = lm.clone();
let lm_dyn: Arc<dyn LanguageModel> = lm;
let pipeline =
RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone());
let answer = pipeline.ask("compound", multi_hop_opts()).unwrap();
assert!(answer.grounded);
assert_eq!(answer.refusal_reason, None);
assert_eq!(
lm_handle.calls(),
4,
"decompose + 2 decide + synthesize = 4 LLM calls"
);
assert_eq!(
retriever_handle.calls(),
2,
"iter 1 retrieves q1, iter 2 retrieves q2"
);
assert_eq!(
answer.retrieval.chunks_returned, 2,
"pool accumulates one new chunk per iter"
);
let hops = answer.hops.expect("happy path stamps hops");
assert_eq!(hops.len(), 4, "[Decompose, Decide(continue), Decide(stop), Synthesize]");
assert_eq!(hops[0].kind, HopKind::Decompose);
assert_eq!(hops[1].kind, HopKind::Decide);
assert_eq!(hops[1].sub_queries, vec!["q2"], "iter 1 decide emits q2");
assert_eq!(
hops[1].context_chunks_added, 1,
"iter 1 retrieve added chunk 1"
);
assert_eq!(hops[2].kind, HopKind::Decide);
assert!(hops[2].sub_queries.is_empty(), "iter 2 decide signals stop");
assert_eq!(
hops[2].context_chunks_added, 1,
"iter 2 retrieve added chunk 2"
);
assert_eq!(hops[3].kind, HopKind::Synthesize);
}
// ── 3. max_depth=1 → forced_stop, decide LLM call skipped ─────────────────

PR 의 핵심 변경에 대한 회귀 핀 누락: 이 PR 이 refuse_no_chunks / refuse_score_gate 시그니처를 widening 해서 hops: Option<Vec<HopRecord>> 받게 했고, ask_multi_hop 의 refusal 경로에서 누적된 hops 를 보존하도록 wiring 했다. 하지만 5 integration test 중 하나도 "multi-hop refusal 경로에서 Answer.hops 가 None 이 아니라 partial trace" 를 직접 검증하지 않는다.

구체 시나리오: ScriptedRetriever::new(vec![vec![]]) (iter 1 retrieve 가 0 hits) + decompose ok → pool 비어 있음 → refuse_no_chunks(Some(hops)) 분기. assert:

  • answer.refusal_reason == Some(NoChunks)
  • answer.hops is Some(non_empty) — 최소 Decompose + Decide(empty) 두 entry
  • hops[0].kind == Decompose

refuse_score_gate 쪽도 비슷한 케이스 하나 추가하면 양쪽 path 모두 핀. 회귀 위험 ↑ — 누군가 widening 을 reverting 해도 happy-path tests 5 건은 그대로 통과한다.

**PR 의 핵심 변경에 대한 회귀 핀 누락**: 이 PR 이 `refuse_no_chunks` / `refuse_score_gate` 시그니처를 widening 해서 `hops: Option<Vec<HopRecord>>` 받게 했고, ask_multi_hop 의 refusal 경로에서 누적된 hops 를 보존하도록 wiring 했다. 하지만 5 integration test 중 하나도 "multi-hop refusal 경로에서 Answer.hops 가 None 이 아니라 partial trace" 를 직접 검증하지 않는다. 구체 시나리오: `ScriptedRetriever::new(vec![vec![]])` (iter 1 retrieve 가 0 hits) + decompose ok → pool 비어 있음 → `refuse_no_chunks(Some(hops))` 분기. assert: - `answer.refusal_reason == Some(NoChunks)` - `answer.hops` is `Some(non_empty)` — 최소 Decompose + Decide(empty) 두 entry - `hops[0].kind == Decompose` refuse_score_gate 쪽도 비슷한 케이스 하나 추가하면 양쪽 path 모두 핀. 회귀 위험 ↑ — 누군가 widening 을 reverting 해도 happy-path tests 5 건은 그대로 통과한다.
#[test]
fn multi_hop_max_depth_force_stops() {
let env = RagEnv::new();
let cid = id32("c1");
let did = id32("d1");
env.seed_chunk(&cid, &did, "notes/a.md", "Body text.", &["Intro"]);
let mut cfg = env.config.clone();
// depth 1 means: iter 1 is the last iter, so the per-iter
// `depth_force_stop = iter >= max_depth` fires and the decide
// LLM call is skipped entirely.
cfg.rag.multi_hop_max_depth = 1;
let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])];
let retriever = Arc::new(ScriptedRetriever::new(vec![hits]));
let retriever_handle = retriever.clone();
let retriever_dyn: Arc<dyn Retriever> = retriever;
// Only 2 LLM calls scripted — decompose + synthesize. If the
// pipeline tries to call decide (a bug), ScriptedLm panics on
// exhaustion and the test fails loudly with the call index.
let lm = Arc::new(ScriptedLm::new(vec![
r#"["q1"]"#,
"answer [#1]",
]));
let lm_handle = lm.clone();
let lm_dyn: Arc<dyn LanguageModel> = lm;
let pipeline = RagPipeline::new(cfg, retriever_dyn, lm_dyn, env.sqlite.clone());
let answer = pipeline.ask("q", multi_hop_opts()).unwrap();
assert!(answer.grounded);
assert_eq!(
lm_handle.calls(),
2,
"depth-cap skips decide → only decompose + synthesize"
);
assert_eq!(retriever_handle.calls(), 1);
let hops = answer.hops.expect("happy path stamps hops");
assert_eq!(hops.len(), 3, "[Decompose, Decide(forced_stop), Synthesize]");
assert_eq!(hops[1].kind, HopKind::Decide);
assert!(
hops[1].forced_stop,
"depth cap must surface forced_stop=true on the Decide hop"
);
assert!(
hops[1].sub_queries.is_empty(),
"skipped decide carries no sub_queries"
);
assert_eq!(
hops[1].llm_call_ms, 0,
"skipped decide records 0ms — no LLM call happened"
);
}
// ── 4. dedup: two sub-queries hit same chunk_id, pool keeps 1 ─────────────
#[test]
fn multi_hop_pool_chunks_dedup_by_chunk_id() {
let env = RagEnv::new();
let cid = id32("c1");
let did = id32("d1");
env.seed_chunk(&cid, &did, "notes/a.md", "Shared chunk text.", &["X"]);
// Both sub-queries retrieve the same chunk_id — dedup must
// keep exactly one pool entry.
let shared_hit = mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["X"]);
let retriever = Arc::new(ScriptedRetriever::new(vec![
vec![shared_hit.clone()],
vec![shared_hit],
]));
let retriever_handle = retriever.clone();
let retriever_dyn: Arc<dyn Retriever> = retriever;
let lm = Arc::new(ScriptedLm::new(vec![
r#"["q1", "q2"]"#,
r#"[]"#,
"merged answer [#1]",
]));
let lm_handle = lm.clone();
let lm_dyn: Arc<dyn LanguageModel> = lm;
let pipeline =
RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone());
let answer = pipeline.ask("q", multi_hop_opts()).unwrap();
assert!(answer.grounded);
assert_eq!(
retriever_handle.calls(),
2,
"two sub-queries → two retrieval calls"
);
assert_eq!(
answer.retrieval.chunks_returned, 1,
"dedup by chunk_id keeps pool at 1"
);
assert_eq!(answer.citations.len(), 1, "only one chunk cited as [#1]");
assert_eq!(answer.citations[0].marker.as_deref(), Some("[1]"));
assert_eq!(
lm_handle.calls(),
3,
"decompose + decide + synthesize = 3"
);
let hops = answer.hops.expect("happy path stamps hops");
assert_eq!(hops.len(), 3, "[Decompose, Decide, Synthesize]");
assert_eq!(hops[0].sub_queries, vec!["q1", "q2"]);
assert_eq!(
hops[1].context_chunks_added, 1,
"dedup reduces 2 retrievals → 1 new pool entry"

redundant assertion (nit): assert_eq!(answer.refusal_reason, None) 다음에 바로 assert_ne!(answer.refusal_reason, Some(MultiHopDecomposeFailed)) 가 있는데, None != Some(_) 은 trivial 하게 참. 메시지의 의도 ("MultiHopDecomposeFailed is reserved for the initial decompose hop") 자체는 가치 있지만 assert 로는 정보 추가 안 함.

선택지:

  • assert_eq! 만 남기고 assert_ne! 제거 (메시지를 assert_eq! 의 msg 로 합치기 — "decide parse failure is graceful degrade, not refusal — MultiHopDecomposeFailed is reserved for the initial decompose hop").
  • 또는 assert_ne! 만 두고 assert_eq! 제거 — 의도는 "MultiHopDecomposeFailed 가 아니다" 가 핵심이므로.

둘 다 의도 동일. 한 assertion 으로 줄이고 메시지를 합치는 게 깔끔.

**redundant assertion (nit)**: `assert_eq!(answer.refusal_reason, None)` 다음에 바로 `assert_ne!(answer.refusal_reason, Some(MultiHopDecomposeFailed))` 가 있는데, `None != Some(_)` 은 trivial 하게 참. 메시지의 의도 ("MultiHopDecomposeFailed is reserved for the initial decompose hop") 자체는 가치 있지만 assert 로는 정보 추가 안 함. 선택지: - `assert_eq!` 만 남기고 `assert_ne!` 제거 (메시지를 `assert_eq!` 의 msg 로 합치기 — "decide parse failure is graceful degrade, not refusal — MultiHopDecomposeFailed is reserved for the initial decompose hop"). - 또는 `assert_ne!` 만 두고 `assert_eq!` 제거 — 의도는 "MultiHopDecomposeFailed 가 아니다" 가 핵심이므로. 둘 다 의도 동일. 한 assertion 으로 줄이고 메시지를 합치는 게 깔끔.
);
}
// ── 5. decide parse failure → graceful synthesize (NOT a refusal) ─────────
#[test]
fn multi_hop_decide_parse_failure_falls_through_to_synthesize() {
let env = RagEnv::new();
let cid = id32("c1");
let did = id32("d1");
env.seed_chunk(&cid, &did, "notes/a.md", "Body text.", &["Intro"]);
let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])];
let retriever = Arc::new(ScriptedRetriever::new(vec![hits]));
let retriever_dyn: Arc<dyn Retriever> = retriever;
// Decide LLM emits non-JSON garbage. Spec §9: this is NOT a
// refusal — pipeline graceful-degrades to synthesize as if the
// decide had returned `[]`. Only the *initial* decompose's
// parse failure is a refusal (MultiHopDecomposeFailed).
let lm = Arc::new(ScriptedLm::new(vec![
r#"["q1"]"#,
"definitely not a JSON array",
"answer [#1]",
]));
let lm_handle = lm.clone();
let lm_dyn: Arc<dyn LanguageModel> = lm;
let pipeline =
RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone());
let answer = pipeline.ask("q", multi_hop_opts()).unwrap();
assert!(
answer.grounded,
"decide parse failure must NOT block synthesis"
);
assert_eq!(
answer.refusal_reason, None,
"decide parse failure is graceful degrade, not refusal — \
MultiHopDecomposeFailed is reserved for the initial decompose hop"
);
assert_eq!(
lm_handle.calls(),
3,
"decompose + (garbage) decide + synthesize"
);
let hops = answer.hops.expect("happy path stamps hops");
assert_eq!(hops.len(), 3, "[Decompose, Decide(parse-fail→stop), Synthesize]");
assert_eq!(hops[1].kind, HopKind::Decide);
assert!(
hops[1].sub_queries.is_empty(),
"parse failure → empty sub_queries (same shape as LLM stop)"
);
assert!(
!hops[1].forced_stop,
"parse-degraded decide is not a cap-driven forced_stop — \
flag stays false even though we synthesize early"
);
}
// ── 6. refuse path: NoChunks preserves partial hop trace ──────────────────
//
// PR-3b-ii widens `refuse_no_chunks` to accept `hops:
// Option<Vec<HopRecord>>` and wires `ask_multi_hop` to forward the
// partial trace. This test pins that contract — a refusal Answer
// still carries the decompose + decide hops the loop accumulated
// before pool came up empty.
#[test]
fn multi_hop_refuse_no_chunks_preserves_hops_trace() {
let env = RagEnv::new();
// Retriever returns 0 hits — pool stays empty → refuse_no_chunks.
let retriever = Arc::new(ScriptedRetriever::new(vec![vec![]]));
let retriever_handle = retriever.clone();
let retriever_dyn: Arc<dyn Retriever> = retriever;
// Only one LM call needed (decompose). Decide is skipped because
// `pool.is_empty()` triggers the (Vec::new(), 0) shortcut. If a
// bug calls the LM beyond decompose, ScriptedLm panics on
// exhaustion and the test fails loudly.
let lm = Arc::new(ScriptedLm::new(vec![r#"["q1"]"#]));
let lm_handle = lm.clone();
let lm_dyn: Arc<dyn LanguageModel> = lm;
let pipeline =
RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone());
let answer = pipeline.ask("q", multi_hop_opts()).unwrap();
assert!(!answer.grounded);
assert_eq!(answer.refusal_reason, Some(RefusalReason::NoChunks));
assert_eq!(retriever_handle.calls(), 1, "single sub-query → single retrieve");
assert_eq!(lm_handle.calls(), 1, "decompose only — decide skipped (empty pool), no synthesize");
let hops = answer
.hops
.expect("PR-3b-ii: refuse_no_chunks must preserve the partial hop trace");
assert_eq!(
hops.len(),
2,
"[Decompose, Decide(empty_pool_skip)] — synthesize never ran"
);
assert_eq!(hops[0].kind, HopKind::Decompose);
assert_eq!(hops[0].sub_queries, vec!["q1"]);
assert_eq!(hops[1].kind, HopKind::Decide);
assert!(hops[1].sub_queries.is_empty());
assert_eq!(
hops[1].context_chunks_added, 0,
"retrieve returned 0 hits → 0 added to pool"
);
}
// ── 7. refuse path: ScoreGate preserves partial hop trace ─────────────────
#[test]
fn multi_hop_refuse_score_gate_preserves_hops_trace() {
let env = RagEnv::new();
let (cid, did) = seed_low_score_chunk(&env);
// Top score 0.10 is well below the default gate (0.30) — the
// score-gate refusal fires after the pool has been built, so
// the decide LLM call did run and the hop trace contains both
// Decompose and Decide entries.
let hits = vec![mk_hit(1, &cid, &did, "notes/low.md", 0.10, &["Low"])];
let retriever = Arc::new(ScriptedRetriever::new(vec![hits]));
let retriever_dyn: Arc<dyn Retriever> = retriever;
// decompose + decide (pool not empty so decide fires) — synthesize
// never runs because we refuse before pack_context.
let lm = Arc::new(ScriptedLm::new(vec![
r#"["q1"]"#,
r#"[]"#,
]));
let lm_handle = lm.clone();
let lm_dyn: Arc<dyn LanguageModel> = lm;
let pipeline =
RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone());
let answer = pipeline.ask("q", multi_hop_opts()).unwrap();
assert!(!answer.grounded);
assert_eq!(answer.refusal_reason, Some(RefusalReason::ScoreGate));
assert_eq!(
lm_handle.calls(),
2,
"decompose + decide ran; synthesize skipped by gate"
);
let hops = answer
.hops
.expect("PR-3b-ii: refuse_score_gate must preserve the partial hop trace");
assert_eq!(
hops.len(),
2,
"[Decompose, Decide(stop)] — synthesize never ran"
);
assert_eq!(hops[0].kind, HopKind::Decompose);
assert_eq!(hops[1].kind, HopKind::Decide);
assert_eq!(
hops[1].context_chunks_added, 1,
"the low-score chunk did enter the pool — gate fires after pool build"
);
}
/// Seed a chunk + return its `(chunk_id, doc_id)` pair. Helper for
/// the score-gate test so the test body stays focused on the
/// hop-trace assertions; returning the pair (instead of the chunk_id
/// alone) avoids the caller having to re-derive `id32("d_low")` and
/// keeps the id pair as a single source of truth.
fn seed_low_score_chunk(env: &RagEnv) -> (String, String) {

함수 이름 nit (actionable): i32_below_gate_chunki32_ prefix 는 Rust integer 타입 i32 와 충돌해서 첫인상이 헷갈린다 (의도는 id32 헬퍼를 부르는 wrapper 라는 뜻이겠지만 그 정보도 호출자에게 가치 없음). 더 의도가 드러나는 이름:

  • seed_below_gate_chunk (action + condition)
  • seed_low_score_chunk (action + 성격)

동시에 함수가 cid 만 반환하면 호출자가 id32("d_low")다시 만들어야 하는데 (line 410 의 mk_hit(1, &cid, &id32("d_low"), ...)), 이 doc_id 도 함수 안의 seed 와 같은 id32 결과라 dual source-of-truth. (chunk_id, doc_id) tuple 또는 바로 SearchHit 까지 만들어 반환하면 호출자 본문이 더 짧아지고 헬퍼-호출자 사이의 id 동기화 깨질 위험도 없어진다.

**함수 이름 nit (actionable)**: `i32_below_gate_chunk` 의 `i32_` prefix 는 Rust integer 타입 `i32` 와 충돌해서 첫인상이 헷갈린다 (의도는 `id32` 헬퍼를 부르는 wrapper 라는 뜻이겠지만 그 정보도 호출자에게 가치 없음). 더 의도가 드러나는 이름: - `seed_below_gate_chunk` (action + condition) - `seed_low_score_chunk` (action + 성격) 동시에 함수가 cid 만 반환하면 호출자가 `id32("d_low")` 를 *다시* 만들어야 하는데 (line 410 의 `mk_hit(1, &cid, &id32("d_low"), ...)`), 이 doc_id 도 함수 안의 seed 와 같은 id32 결과라 dual source-of-truth. `(chunk_id, doc_id)` tuple 또는 바로 `SearchHit` 까지 만들어 반환하면 호출자 본문이 더 짧아지고 헬퍼-호출자 사이의 id 동기화 깨질 위험도 없어진다.
let cid = id32("c_low");
let did = id32("d_low");
env.seed_chunk(&cid, &did, "notes/low.md", "low score text", &["Low"]);
(cid, did)
}