feat(rag): fb-41 PR-3b-ii — ScriptedLm + 5 multi-hop tests + refusal hop trace + carry-over

PR-3b 의 분할 두 번째 PR — PR-3b-i 의 dynamic decide loop 위에서:

1. **ScriptedLm + ScriptedRetriever helper** (kebab-rag tests/common/mod.rs)
   per-call 다른 response 반환. decompose / decide×N / synthesize 의 각
   LLM call 을 구분하는 다단계 multi-hop 시나리오를 mock-only 로 exercise
   가능. `Vec<&str>` / `Vec<Vec<SearchHit>>` 받아 call sequence 순서대로
   emit. Send + Sync.

2. **5 multi-hop integration tests** (kebab-rag tests/multi_hop.rs 신규)
   - decide_stop_triggers_synthesize: decide [] → 즉시 synthesize
   - decide_continue_adds_more_chunks: decide ["q2"] → iter 2 retrieve + pool 확장
   - max_depth_force_stops: depth cap → forced_stop + decide LLM call skip
   - pool_chunks_dedup_by_chunk_id: 같은 chunk_id 두 sub-query 에서 1 회
   - decide_parse_failure_falls_through_to_synthesize: parse fail = graceful
     synthesize (refusal 아님, spec §9)

3. **refuse_* helper hops trace 보존** (회차 1 carry-over)
   refuse_no_chunks / refuse_score_gate 시그니처에 `hops:
   Option<Vec<HopRecord>>` 인자 추가. ask_multi_hop 의 score-gate /
   no-chunks refusal 시 누적된 hops 그대로 Answer.hops 에 보존.
   single-pass ask 는 None 전달 — wire 변동 없음 (skip_serializing_if).

4. **HopRecord doc 보강** (회차 1 carry-over)
   sub_queries 의 per-kind 의미 명시 (Decompose=initial / Decide=next-iter
   or empty=stop / Synthesize=always empty). llm_call_ms=0 의 ambiguity
   (no call vs 0ms call) doc 명시.

5. **MULTI_HOP_MAX_SUB_QUERIES_DEFAULT → _HARD_CAP rename** (회차 1 carry-over)
   const 의 의도 명확화 — config knob `multi_hop_max_sub_queries_per_iter`
   (5, prompt-side soft hint) 와 const (10, parse-side hard ceiling)
   분리. 두 layer 의 책임 doc 동기화. test 도 rename.

6. **decide guard 단순화 + preview budget doc** (회차 1 carry-over)
   parse_decompose_response 의 post-condition (Some=non-empty 보장)
   doc 명시. defensive `Some(qs) if !qs.is_empty()` →
   `decide_result.unwrap_or_default()` 단순화. decide preview 의
   snippet-only path (full chunk text 안 fetch) 의도 doc.

검증
- `cargo test -p kebab-rag -j 1` — 31 unit + 19 pipeline + 5 multi_hop
  + 3 prompt_template + 3 streaming 모두 통과.
- `cargo clippy -p kebab-rag --all-targets -j 1 -- -D warnings` clean.

Spec / plan
- design: docs/superpowers/specs/2026-05-25-p9-fb-41-multi-hop-rag-design.md
- plan: docs/superpowers/plans/2026-05-25-p9-fb-41-multi-hop-rag.md (PR-3b 단락)

다음 단계 = PR-4 (CLI --multi-hop + wire schema + error_wire).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-25 08:17:37 +00:00
parent 94e6146013
commit 6188a50c1c
5 changed files with 648 additions and 41 deletions

View File

@@ -189,9 +189,13 @@ pub struct RagCfg {
#[serde(default = "default_multi_hop_max_depth")]
pub multi_hop_max_depth: u32,
/// p9-fb-41: cap on how many sub-queries the LLM may emit in a
/// single decompose / decide call. Mirrors
/// [`MULTI_HOP_MAX_SUB_QUERIES_DEFAULT`] in kebab-rag — the
/// const is the hard floor while this is the runtime knob.
/// single decompose / decide call. This is the *prompt-side
/// soft hint* — the value the pipeline injects into the
/// decompose / decide prompts so the LLM knows what to aim for.
/// kebab-rag enforces a separate compile-time hard ceiling
/// (`MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP`, currently 10) as a
/// safety net against misbehaving models — if you raise this
/// knob above the hard cap, bump the const in the same PR.
/// Default `5`.
#[serde(default = "default_multi_hop_max_sub_queries_per_iter")]
pub multi_hop_max_sub_queries_per_iter: u32,

View File

@@ -79,8 +79,18 @@ pub struct HopRecord {
/// the final iter is the synthesize call.
pub iter: u32,
pub kind: HopKind,
/// Sub-queries the LLM emitted at this iter. For the synthesize
/// hop this is empty (no sub-queries — just the final answer).
/// Sub-queries associated with this hop. The meaning depends on
/// `kind`:
///
/// - [`HopKind::Decompose`]: the initial sub-queries the LLM
/// broke the original user query into. These drive the
/// `iter=1` retrieval round.
/// - [`HopKind::Decide`]: the *new* sub-queries the LLM
/// emitted to drive the next retrieval round. Empty when the
/// LLM signalled stop OR when `forced_stop = true` (cap hit
/// or parse-degraded).
/// - [`HopKind::Synthesize`]: always empty — the final hop
/// produces the user-visible answer, not more sub-queries.
#[serde(default)]
pub sub_queries: Vec<String>,
/// Number of *new* chunks the retrieval round contributed to the
@@ -98,6 +108,13 @@ pub struct HopRecord {
/// Wall-clock latency of the LLM call for this hop, in
/// milliseconds. Useful for cost / latency analysis when a
/// `kebab eval` run records `Answer.hops`.
///
/// `0` is overloaded: it means "no LLM call happened at this
/// hop" when (a) the hop was a Decide skipped due to
/// `forced_stop` (depth-cap or pool-cap fired before the LLM
/// was asked) or (b) the pool was empty before any decide
/// could run. Treat `0` as "absent or instantaneous" rather
/// than as a genuine measurement.
pub llm_call_ms: u32,
}

View File

@@ -309,10 +309,10 @@ impl RagPipeline {
// ── 2. Score gate ──────────────────────────────────────────────────
if hits.is_empty() {
return self.refuse_no_chunks(query, &opts, k_effective, started);
return self.refuse_no_chunks(query, &opts, k_effective, started, None);
}
if top_score < self.config.rag.score_gate {
return self.refuse_score_gate(query, &opts, &hits, k_effective, started);
return self.refuse_score_gate(query, &opts, &hits, k_effective, started, None);
}
// ── 3. Pack context ────────────────────────────────────────────────
@@ -330,7 +330,7 @@ impl RagPipeline {
"kb-rag: all retrieved chunks were unfetchable from the store; \
falling back to NoChunks refusal"
);
return self.refuse_no_chunks(query, &opts, k_effective, started);
return self.refuse_no_chunks(query, &opts, k_effective, started, None);
}
// ── 4. Render prompt ───────────────────────────────────────────────
@@ -697,6 +697,16 @@ impl RagPipeline {
if forced_stop || pool.is_empty() {
(Vec::new(), 0)
} else {
// Snippet-based preview: each pool entry contributes
// its `SearchHit.snippet` (already truncated upstream
// by the retriever). `max_pool_chunks` acts as the
// implicit cap on this string's length — the loop
// breaks before we accumulate more pool entries.
// We intentionally do NOT route this through
// `pack_context` (no full chunk text fetch, no
// marker numbering): decide only needs gist to
// judge sufficiency, and full text is reserved for
// the terminal synthesize call.
let preview = pool
.iter()
.enumerate()
@@ -711,13 +721,14 @@ impl RagPipeline {
depth_remaining,
&opts,
)?;
match decide_result {
Some(qs) if !qs.is_empty() => (qs, ms),
// Empty array OR parse failure → stop signal.
// Parse failure is NOT a refusal — graceful
// degrade to early synthesize (per spec §9).
_ => (Vec::new(), ms),
}
// `parse_decompose_response` post-condition: when
// it returns `Some(qs)`, `qs` is guaranteed
// non-empty (and trimmed + hard-capped). `None`
// covers both "parse failure" and "empty array
// after trim" — both mean stop. Parse failure is
// NOT a refusal here (spec §9 — graceful degrade
// to early synthesize on the decide hop only).
(decide_result.unwrap_or_default(), ms)
};
hops.push(HopRecord {
@@ -756,11 +767,27 @@ impl RagPipeline {
let top_score = pool.first().map(|h| h.retrieval.fusion_score).unwrap_or(0.0);
// ── 3. Score gate / no chunks ──────────────────────────────────────
// PR-3b-ii: forward the partial hop trace into the refusal so
// a `--multi-hop` user can still see which decompose / decide
// signals fired before the score-gate / no-chunks bailout.
if pool.is_empty() {
return self.refuse_no_chunks(query, &opts, k_effective, started);
return self.refuse_no_chunks(
query,
&opts,
k_effective,
started,
Some(hops),
);
}
if top_score < self.config.rag.score_gate {
return self.refuse_score_gate(query, &opts, &pool, k_effective, started);
return self.refuse_score_gate(
query,
&opts,
&pool,
k_effective,
started,
Some(hops),
);
}
// ── 4. Pack context ────────────────────────────────────────────────
@@ -772,7 +799,13 @@ impl RagPipeline {
pool_size = pool.len(),
"kb-rag: multi-hop pool chunks all unfetchable; falling back to NoChunks"
);
return self.refuse_no_chunks(query, &opts, k_effective, started);
return self.refuse_no_chunks(
query,
&opts,
k_effective,
started,
Some(hops),
);
}
// ── 5. Synthesize prompt ───────────────────────────────────────────
@@ -1262,12 +1295,20 @@ impl RagPipeline {
/// Refusal path for empty hits — `RefusalReason::NoChunks`. No LLM
/// call. The persisted row records `chunks_returned = 0`.
///
/// `hops` is `None` on the single-pass path; the multi-hop path
/// (PR-3b-ii) forwards the partial hop trace accumulated up to
/// the refusal point so a `--multi-hop` user can still see which
/// decompose / decide signals fired before retrieval came up
/// empty. The trace is wire-additive (`Answer.hops` already
/// `skip_serializing_if = None`).
fn refuse_no_chunks(
&self,
query: &str,
opts: &AskOpts,
k_effective: usize,
started: std::time::Instant,
hops: Option<Vec<HopRecord>>,
) -> Result<Answer> {
let trace_id = mint_trace_id(query, 0.0, &self.llm.model_ref().id);
let elapsed_ms = u32::try_from(started.elapsed().as_millis()).unwrap_or(u32::MAX);
@@ -1298,11 +1339,12 @@ impl RagPipeline {
created_at: OffsetDateTime::now_utc(),
conversation_id: opts.conversation_id.clone(),
turn_index: opts.turn_index,
// p9-fb-41 Step 2 of PR-3: every Answer literal carries
// `hops`. Single-pass + refusal paths leave it `None`;
// only the multi-hop happy path will set `Some(...)` in
// Step 5 once the decide loop populates a hop trace.
hops: None,
// p9-fb-41 PR-3b-ii: single-pass callers pass `None`;
// `ask_multi_hop` forwards the partial hop trace it
// built up to the refusal point. Either way `Answer.hops`
// stays `skip_serializing_if = None`, so single-pass
// wire output is unchanged.
hops,
};
if let Err(e) = self.docs.put_answer(&answer, query, None) {
tracing::warn!(target: "kebab-rag", error = %e, "kb-rag: put_answer (NoChunks) failed");
@@ -1313,6 +1355,11 @@ impl RagPipeline {
/// Refusal path for top-1 below the gate — `RefusalReason::ScoreGate`.
/// No LLM call. Lists up to three near-miss candidates verbatim in
/// `answer` so the user gets actionable context.
///
/// `hops` is `None` on the single-pass path; the multi-hop path
/// (PR-3b-ii) forwards the partial hop trace accumulated before
/// the gate refusal. See [`Self::refuse_no_chunks`] for the
/// shared rationale.
fn refuse_score_gate(
&self,
query: &str,
@@ -1320,6 +1367,7 @@ impl RagPipeline {
hits: &[SearchHit],
k_effective: usize,
started: std::time::Instant,
hops: Option<Vec<HopRecord>>,
) -> Result<Answer> {
let top_score = hits[0].retrieval.fusion_score;
let gate = self.config.rag.score_gate;
@@ -1385,11 +1433,8 @@ impl RagPipeline {
created_at: OffsetDateTime::now_utc(),
conversation_id: opts.conversation_id.clone(),
turn_index: opts.turn_index,
// p9-fb-41 Step 2 of PR-3: every Answer literal carries
// `hops`. Single-pass + refusal paths leave it `None`;
// only the multi-hop happy path will set `Some(...)` in
// Step 5 once the decide loop populates a hop trace.
hops: None,
// p9-fb-41 PR-3b-ii: see refuse_no_chunks' identical comment.
hops,
};
if let Err(e) = self.docs.put_answer(&answer, query, None) {
tracing::warn!(target: "kebab-rag", error = %e, "kb-rag: put_answer (ScoreGate) failed");
@@ -1443,11 +1488,27 @@ fn compute_stale(
/// (design §9) can tell the two paths apart in `eval_runs.config_snapshot_json`.
pub(crate) const PROMPT_TEMPLATE_VERSION_MULTI_HOP: &str = "rag-multi-hop-v1";
/// Max sub-questions the decompose call may emit per iteration. PR-2
/// ships with a fixed depth=2 pipeline (decompose once + synthesize),
/// so this cap acts on the single decompose call. PR-3 (dynamic iter)
/// will reuse the same cap for the per-iter decide call too.
pub(crate) const MULTI_HOP_MAX_SUB_QUERIES_DEFAULT: usize = 5;
/// Hard parse-side cap on how many sub-question strings
/// [`parse_decompose_response`] will accept from a single LLM response,
/// regardless of `RagCfg.multi_hop_max_sub_queries_per_iter`.
///
/// This is intentionally a *defensive* compile-time cap, not the
/// user-tunable knob:
///
/// - `RagCfg.multi_hop_max_sub_queries_per_iter` (default 5) is the
/// *prompt-side soft hint* — the value the pipeline injects into the
/// decompose / decide prompts so the LLM knows what to aim for.
/// Users can raise it via config / env.
/// - `MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP` (this const) is the
/// *parse-side hard ceiling* — a misbehaving model that emits 100
/// sub-questions gets cropped here so the retrieve loop does not
/// spawn an unbounded number of search calls per iter.
///
/// In practice this is generous (the soft hint is 5 by default, the
/// hard cap is 10) so user-tunable expansion to ~10 stays unaffected.
/// If a future config knob exceeds this number, raise the const in
/// the same PR.
pub(crate) const MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP: usize = 10;
const MULTI_HOP_DECOMPOSE_SYSTEM_PROMPT: &str = "당신은 사용자의 질문을 다단계 검색에 필요한 sub-question 들로 분해하는 도구다.\n- multi-hop 정보가 필요한 경우 독립적으로 검색 가능한 sub-question 들로 분해한다.\n- 각 sub-question 은 자기 자신만으로 의미가 통해야 한다 (대명사 / \"위 답변\" 같은 reference 금지).\n- 원본이 이미 단순하면 원본 그대로 1 개만 반환한다.\n- 응답은 JSON array of strings 만 출력한다. 다른 prose / markdown fence / 설명 금지.";
@@ -1590,13 +1651,21 @@ fn mint_trace_id(query: &str, top_score: f32, model_id: &str) -> TraceId {
/// into a vector of sub-question strings. Strips a leading markdown
/// code-fence (```json ... ```), then deserializes as a JSON array of
/// strings, then trims each entry + drops empties + caps at
/// [`MULTI_HOP_MAX_SUB_QUERIES_DEFAULT`].
/// [`MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP`].
///
/// Returns `None` when:
/// - parse fails outright (not a JSON array of strings),
/// - the array deserializes but is empty after trim/drop,
///
/// in which case the caller surfaces `RefusalReason::MultiHopDecomposeFailed`.
/// in which case the caller surfaces `RefusalReason::MultiHopDecomposeFailed`
/// (for the initial decompose hop) or treats the signal as
/// "synthesize now" (for the decide hop — see
/// [`RagPipeline::multi_hop_decide`]).
///
/// `Some(non_empty)` is the only success shape: the post-conditions
/// guarantee at least one trimmed non-empty entry, capped at
/// [`MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP`]. Callers therefore do
/// not need a defensive `if !qs.is_empty()` guard.
fn parse_decompose_response(raw: &str) -> Option<Vec<String>> {
let stripped = strip_markdown_json_fence(raw.trim());
let arr: Vec<String> = serde_json::from_str(stripped).ok()?;
@@ -1604,7 +1673,7 @@ fn parse_decompose_response(raw: &str) -> Option<Vec<String>> {
.into_iter()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.take(MULTI_HOP_MAX_SUB_QUERIES_DEFAULT)
.take(MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP)
.collect();
if cleaned.is_empty() {
None
@@ -1696,12 +1765,15 @@ mod tests {
}
#[test]
fn parse_decompose_response_caps_at_max_sub_queries() {
// 7 entries; cap is MULTI_HOP_MAX_SUB_QUERIES_DEFAULT (=5).
let raw = r#"["a","b","c","d","e","f","g"]"#;
fn parse_decompose_response_caps_at_hard_cap() {
// 12 entries; parse-side hard cap = 10. Pins the cap and the
// truncation order (first-N keep, tail drop) so a future
// refactor can't accidentally relax the safety ceiling or
// re-order the take/filter chain.
let raw = r#"["a","b","c","d","e","f","g","h","i","j","k","l"]"#;
let out = parse_decompose_response(raw).unwrap();
assert_eq!(out.len(), MULTI_HOP_MAX_SUB_QUERIES_DEFAULT);
assert_eq!(out, vec!["a", "b", "c", "d", "e"]);
assert_eq!(out.len(), MULTI_HOP_MAX_SUB_QUERIES_HARD_CAP);
assert_eq!(out, vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]);
}
#[test]

View File

@@ -206,6 +206,47 @@ impl Retriever for MockRetriever {
}
}
/// p9-fb-41 PR-3b-ii: scripted retriever. Returns a different
/// `Vec<SearchHit>` per `search` call from a pre-supplied sequence,
/// so a multi-hop test can simulate "iter 1 returns chunk A, iter 2
/// returns chunks A+B" (pool dedup) or "different sub-queries hit
/// different docs". Exhaustion returns an empty `Vec` (no panic) —
/// the pipeline already handles "no hits this round" gracefully via
/// the dedup loop, and a panic would conflate "test forgot a row"
/// with "pipeline made an unexpected extra retrieval call".
///
/// Use [`ScriptedRetriever::calls`] to assert the expected number
/// of retrievals occurred.
pub struct ScriptedRetriever {
hits_per_call: Vec<Vec<SearchHit>>,
next: std::sync::atomic::AtomicUsize,
}
impl ScriptedRetriever {
pub fn new(hits_per_call: Vec<Vec<SearchHit>>) -> Self {
Self {
hits_per_call,
next: std::sync::atomic::AtomicUsize::new(0),
}
}
pub fn calls(&self) -> usize {
self.next.load(std::sync::atomic::Ordering::SeqCst)
}
}
impl Retriever for ScriptedRetriever {
fn search(&self, _q: &SearchQuery) -> anyhow::Result<Vec<SearchHit>> {
let idx = self
.next
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(self.hits_per_call.get(idx).cloned().unwrap_or_default())
}
fn index_version(&self) -> IndexVersion {
IndexVersion("test-iv".to_string())
}
}
/// Pad a short prefix to the 32-hex shape `kebab_core` newtypes expect.
pub fn id32(prefix: &str) -> String {
let mut s = prefix.to_string();
@@ -215,3 +256,128 @@ pub fn id32(prefix: &str) -> String {
s.truncate(32);
s
}
/// p9-fb-41 PR-3b-ii: scripted language model. Returns a different
/// canned response per `generate_stream` call from a pre-supplied
/// `Vec<String>`. Mirrors `MockLanguageModel`'s streaming contract
/// (one `TokenChunk::Token` per Unicode scalar, terminal `Done` with
/// `canned_usage`, stop-string truncation honoured) but lets a test
/// distinguish the decompose / per-iter decide / synthesize LLM calls
/// of `RagPipeline::ask_multi_hop` — each can return a different
/// payload (`["q1","q2"]`, `[]`, `"final answer [#1]"`, etc.).
///
/// Internally `Arc<Vec<String>> + AtomicUsize` so the type is
/// `Send + Sync` and can be wrapped in `Arc<dyn LanguageModel>`.
/// Tests can read `calls()` for an assertion on the expected LLM
/// call count.
///
/// Exhaustion (more calls than scripted responses) panics — tests
/// that need an "infinite" final response can supply a longer
/// script; the panic message names the call index so the test
/// failure points at the missing entry.
pub struct ScriptedLm {
model_id: String,
provider: String,
context_tokens: usize,
/// Canned responses in call order. Index `i` is returned on the
/// `i`-th `generate_stream` call (0-based).
responses: Vec<String>,
/// 0-based index of the next response to return on `generate_stream`.
next: std::sync::atomic::AtomicUsize,
canned_finish: kebab_core::FinishReason,
canned_usage: kebab_core::TokenUsage,
}
impl ScriptedLm {
/// Build a scripted LM with the default model_id/provider used by
/// the rest of the test suite (`mock-lm` / `mock`) and the
/// MockLanguageModel-equivalent canned usage. Call `with_*`
/// builders if a test needs to override the defaults.
pub fn new(responses: Vec<&str>) -> Self {
Self {
model_id: "mock-lm".to_string(),
provider: "mock".to_string(),
context_tokens: 32_768,
responses: responses.into_iter().map(str::to_string).collect(),
next: std::sync::atomic::AtomicUsize::new(0),
canned_finish: kebab_core::FinishReason::Stop,
canned_usage: kebab_core::TokenUsage {
prompt_tokens: 10,
completion_tokens: 5,
latency_ms: 7,
},
}
}
/// Total `generate_stream` invocations so far. Tests use this to
/// assert "exactly N LLM calls happened" without scanning the
/// HopRecord trace (the trace is the user-visible signal; this
/// is the lower-level call counter).
pub fn calls(&self) -> usize {
self.next.load(std::sync::atomic::Ordering::SeqCst)
}
/// Earliest byte position of any non-empty stop string in
/// `canned`. Same precedence rule as `MockLanguageModel`:
/// `Iterator::min` returns the first equal element, so ties
/// break by `stop` declaration order. `str::find` returns a
/// UTF-8 char boundary by contract, so the resulting prefix
/// slice is sound.
fn apply_stop<'a>(canned: &'a str, stop: &[String]) -> (&'a str, bool) {
let earliest = stop
.iter()
.filter(|s| !s.is_empty())
.filter_map(|s| canned.find(s.as_str()))
.min();
match earliest {
Some(idx) => (&canned[..idx], true),
None => (canned, false),
}
}
}
impl kebab_core::LanguageModel for ScriptedLm {
fn model_ref(&self) -> kebab_core::ModelRef {
kebab_core::ModelRef {
id: self.model_id.clone(),
provider: self.provider.clone(),
dimensions: None,
}
}
fn context_tokens(&self) -> usize {
self.context_tokens
}
fn generate_stream(
&self,
req: kebab_core::GenerateRequest,
) -> anyhow::Result<
Box<dyn Iterator<Item = anyhow::Result<kebab_core::TokenChunk>> + Send>,
> {
let idx = self
.next
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let canned = self.responses.get(idx).unwrap_or_else(|| {
panic!(
"ScriptedLm exhausted: call #{idx} requested but only {} responses scripted",
self.responses.len()
)
});
let (truncated, stop_hit) = Self::apply_stop(canned, &req.stop);
let mut chunks: Vec<kebab_core::TokenChunk> = truncated
.chars()
.map(|c| kebab_core::TokenChunk::Token(c.to_string()))
.collect();
let finish_reason = if stop_hit {
kebab_core::FinishReason::Stop
} else {
self.canned_finish.clone()
};
chunks.push(kebab_core::TokenChunk::Done {
finish_reason,
usage: self.canned_usage.clone(),
});
Ok(Box::new(chunks.into_iter().map(Ok)))
}
}

View File

@@ -0,0 +1,348 @@
//! p9-fb-41 PR-3b-ii: integration tests for the dynamic multi-hop
//! decide loop in [`RagPipeline::ask_multi_hop`].
//!
//! Each test uses [`ScriptedLm`] to drive a different LLM response
//! per call (decompose → 0..N decide → synthesize) and, where the
//! scenario requires, [`ScriptedRetriever`] to drive different hit
//! lists per retrieval round. The test fixture stays mock-only —
//! no Ollama / fastembed / LanceDB.
//!
//! Coverage:
//!
//! 1. `decide_stop_triggers_synthesize` — decide returns `[]`,
//! pipeline transitions straight to synthesize.
//! 2. `decide_continue_adds_more_chunks` — decide returns
//! `["q2"]`, iter 2 retrieves and grows the pool.
//! 3. `max_depth_force_stops` — `multi_hop_max_depth = 1` forces
//! `forced_stop = true` on the depth-1 decide hop and skips the
//! decide LLM call.
//! 4. `pool_chunks_dedup_by_chunk_id` — two sub-queries return the
//! same chunk; pool dedups by `chunk_id`.
//! 5. `decide_parse_failure_falls_through_to_synthesize` — decide
//! LLM emits non-JSON garbage; pipeline graceful-degrades to
//! synthesize (NOT a refusal).
mod common;
use std::sync::Arc;
use common::{RagEnv, ScriptedLm, ScriptedRetriever, id32, mk_hit};
use kebab_core::{HopKind, LanguageModel, RefusalReason, Retriever, SearchMode};
use kebab_rag::{AskOpts, RagPipeline};
/// Default `AskOpts` for multi-hop tests: deterministic seed,
/// lexical mode (so the test crate doesn't need to wire up an
/// embedder), and `multi_hop: true` to route through
/// `ask_multi_hop`.
fn multi_hop_opts() -> AskOpts {
AskOpts {
k: 5,
explain: false,
mode: SearchMode::Lexical,
temperature: Some(0.0),
seed: Some(0),
stream_sink: None,
history: Vec::new(),
conversation_id: None,
turn_index: None,
multi_hop: true,
}
}
// ── 1. decide returns [] → synthesize immediately ─────────────────────────
#[test]
fn multi_hop_decide_stop_triggers_synthesize() {
let env = RagEnv::new();
let cid = id32("c1");
let did = id32("d1");
env.seed_chunk(&cid, &did, "notes/a.md", "Body text.", &["Intro"]);
let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])];
let retriever = Arc::new(ScriptedRetriever::new(vec![hits]));
let retriever_handle = retriever.clone();
let retriever_dyn: Arc<dyn Retriever> = retriever;
// Three LLM calls in order: decompose → decide → synthesize.
let lm = Arc::new(ScriptedLm::new(vec![
r#"["q1"]"#,
r#"[]"#,
"answer body [#1]",
]));
let lm_handle = lm.clone();
let lm_dyn: Arc<dyn LanguageModel> = lm;
let pipeline =
RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone());
let answer = pipeline.ask("compound", multi_hop_opts()).unwrap();
assert!(answer.grounded, "decide-stop synthesize must be grounded");
assert_eq!(answer.refusal_reason, None);
assert_eq!(
lm_handle.calls(),
3,
"decompose + decide + synthesize = 3 LLM calls"
);
assert_eq!(
retriever_handle.calls(),
1,
"single sub-query → single retrieval"
);
let hops = answer.hops.expect("multi-hop happy path stamps Some(hops)");
assert_eq!(hops.len(), 3, "[Decompose, Decide(stop), Synthesize]");
assert_eq!(hops[0].kind, HopKind::Decompose);
assert_eq!(hops[0].sub_queries, vec!["q1"]);
assert_eq!(hops[1].kind, HopKind::Decide);
assert!(
hops[1].sub_queries.is_empty(),
"decide stop signal → empty sub_queries on the HopRecord"
);
assert!(
!hops[1].forced_stop,
"LLM stop signal is NOT a forced_stop (forced_stop = cap-driven only)"
);
assert_eq!(hops[2].kind, HopKind::Synthesize);
}
// ── 2. decide ["q2"] → iter 2 retrieves and grows the pool ────────────────
#[test]
fn multi_hop_decide_continue_adds_more_chunks() {
let env = RagEnv::new();
let cid1 = id32("c1");
let did1 = id32("d1");
let cid2 = id32("c2");
let did2 = id32("d2");
env.seed_chunk(&cid1, &did1, "notes/a.md", "Chunk one.", &["A"]);
env.seed_chunk(&cid2, &did2, "notes/b.md", "Chunk two.", &["B"]);
// iter 1 retrieves chunk 1; iter 2 retrieves chunk 2 (different
// chunk_id → pool grows).
let retriever = Arc::new(ScriptedRetriever::new(vec![
vec![mk_hit(1, &cid1, &did1, "notes/a.md", 0.85, &["A"])],
vec![mk_hit(1, &cid2, &did2, "notes/b.md", 0.80, &["B"])],
]));
let retriever_handle = retriever.clone();
let retriever_dyn: Arc<dyn Retriever> = retriever;
let lm = Arc::new(ScriptedLm::new(vec![
r#"["q1"]"#,
r#"["q2"]"#,
r#"[]"#,
"synthesized [#1] [#2]",
]));
let lm_handle = lm.clone();
let lm_dyn: Arc<dyn LanguageModel> = lm;
let pipeline =
RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone());
let answer = pipeline.ask("compound", multi_hop_opts()).unwrap();
assert!(answer.grounded);
assert_eq!(answer.refusal_reason, None);
assert_eq!(
lm_handle.calls(),
4,
"decompose + 2 decide + synthesize = 4 LLM calls"
);
assert_eq!(
retriever_handle.calls(),
2,
"iter 1 retrieves q1, iter 2 retrieves q2"
);
assert_eq!(
answer.retrieval.chunks_returned, 2,
"pool accumulates one new chunk per iter"
);
let hops = answer.hops.expect("happy path stamps hops");
assert_eq!(hops.len(), 4, "[Decompose, Decide(continue), Decide(stop), Synthesize]");
assert_eq!(hops[0].kind, HopKind::Decompose);
assert_eq!(hops[1].kind, HopKind::Decide);
assert_eq!(hops[1].sub_queries, vec!["q2"], "iter 1 decide emits q2");
assert_eq!(
hops[1].context_chunks_added, 1,
"iter 1 retrieve added chunk 1"
);
assert_eq!(hops[2].kind, HopKind::Decide);
assert!(hops[2].sub_queries.is_empty(), "iter 2 decide signals stop");
assert_eq!(
hops[2].context_chunks_added, 1,
"iter 2 retrieve added chunk 2"
);
assert_eq!(hops[3].kind, HopKind::Synthesize);
}
// ── 3. max_depth=1 → forced_stop, decide LLM call skipped ─────────────────
#[test]
fn multi_hop_max_depth_force_stops() {
let env = RagEnv::new();
let cid = id32("c1");
let did = id32("d1");
env.seed_chunk(&cid, &did, "notes/a.md", "Body text.", &["Intro"]);
let mut cfg = env.config.clone();
// depth 1 means: iter 1 is the last iter, so the per-iter
// `depth_force_stop = iter >= max_depth` fires and the decide
// LLM call is skipped entirely.
cfg.rag.multi_hop_max_depth = 1;
let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])];
let retriever = Arc::new(ScriptedRetriever::new(vec![hits]));
let retriever_handle = retriever.clone();
let retriever_dyn: Arc<dyn Retriever> = retriever;
// Only 2 LLM calls scripted — decompose + synthesize. If the
// pipeline tries to call decide (a bug), ScriptedLm panics on
// exhaustion and the test fails loudly with the call index.
let lm = Arc::new(ScriptedLm::new(vec![
r#"["q1"]"#,
"answer [#1]",
]));
let lm_handle = lm.clone();
let lm_dyn: Arc<dyn LanguageModel> = lm;
let pipeline = RagPipeline::new(cfg, retriever_dyn, lm_dyn, env.sqlite.clone());
let answer = pipeline.ask("q", multi_hop_opts()).unwrap();
assert!(answer.grounded);
assert_eq!(
lm_handle.calls(),
2,
"depth-cap skips decide → only decompose + synthesize"
);
assert_eq!(retriever_handle.calls(), 1);
let hops = answer.hops.expect("happy path stamps hops");
assert_eq!(hops.len(), 3, "[Decompose, Decide(forced_stop), Synthesize]");
assert_eq!(hops[1].kind, HopKind::Decide);
assert!(
hops[1].forced_stop,
"depth cap must surface forced_stop=true on the Decide hop"
);
assert!(
hops[1].sub_queries.is_empty(),
"skipped decide carries no sub_queries"
);
assert_eq!(
hops[1].llm_call_ms, 0,
"skipped decide records 0ms — no LLM call happened"
);
}
// ── 4. dedup: two sub-queries hit same chunk_id, pool keeps 1 ─────────────
#[test]
fn multi_hop_pool_chunks_dedup_by_chunk_id() {
let env = RagEnv::new();
let cid = id32("c1");
let did = id32("d1");
env.seed_chunk(&cid, &did, "notes/a.md", "Shared chunk text.", &["X"]);
// Both sub-queries retrieve the same chunk_id — dedup must
// keep exactly one pool entry.
let shared_hit = mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["X"]);
let retriever = Arc::new(ScriptedRetriever::new(vec![
vec![shared_hit.clone()],
vec![shared_hit],
]));
let retriever_handle = retriever.clone();
let retriever_dyn: Arc<dyn Retriever> = retriever;
let lm = Arc::new(ScriptedLm::new(vec![
r#"["q1", "q2"]"#,
r#"[]"#,
"merged answer [#1]",
]));
let lm_handle = lm.clone();
let lm_dyn: Arc<dyn LanguageModel> = lm;
let pipeline =
RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone());
let answer = pipeline.ask("q", multi_hop_opts()).unwrap();
assert!(answer.grounded);
assert_eq!(
retriever_handle.calls(),
2,
"two sub-queries → two retrieval calls"
);
assert_eq!(
answer.retrieval.chunks_returned, 1,
"dedup by chunk_id keeps pool at 1"
);
assert_eq!(answer.citations.len(), 1, "only one chunk cited as [#1]");
assert_eq!(answer.citations[0].marker.as_deref(), Some("[1]"));
assert_eq!(
lm_handle.calls(),
3,
"decompose + decide + synthesize = 3"
);
let hops = answer.hops.expect("happy path stamps hops");
assert_eq!(hops.len(), 3, "[Decompose, Decide, Synthesize]");
assert_eq!(hops[0].sub_queries, vec!["q1", "q2"]);
assert_eq!(
hops[1].context_chunks_added, 1,
"dedup reduces 2 retrievals → 1 new pool entry"
);
}
// ── 5. decide parse failure → graceful synthesize (NOT a refusal) ─────────
#[test]
fn multi_hop_decide_parse_failure_falls_through_to_synthesize() {
let env = RagEnv::new();
let cid = id32("c1");
let did = id32("d1");
env.seed_chunk(&cid, &did, "notes/a.md", "Body text.", &["Intro"]);
let hits = vec![mk_hit(1, &cid, &did, "notes/a.md", 0.85, &["Intro"])];
let retriever = Arc::new(ScriptedRetriever::new(vec![hits]));
let retriever_dyn: Arc<dyn Retriever> = retriever;
// Decide LLM emits non-JSON garbage. Spec §9: this is NOT a
// refusal — pipeline graceful-degrades to synthesize as if the
// decide had returned `[]`. Only the *initial* decompose's
// parse failure is a refusal (MultiHopDecomposeFailed).
let lm = Arc::new(ScriptedLm::new(vec![
r#"["q1"]"#,
"definitely not a JSON array",
"answer [#1]",
]));
let lm_handle = lm.clone();
let lm_dyn: Arc<dyn LanguageModel> = lm;
let pipeline =
RagPipeline::new(env.config.clone(), retriever_dyn, lm_dyn, env.sqlite.clone());
let answer = pipeline.ask("q", multi_hop_opts()).unwrap();
assert!(
answer.grounded,
"decide parse failure must NOT block synthesis"
);
assert_eq!(
answer.refusal_reason, None,
"decide parse failure is graceful degrade, not refusal"
);
assert_ne!(
answer.refusal_reason,
Some(RefusalReason::MultiHopDecomposeFailed),
"MultiHopDecomposeFailed is reserved for the initial decompose hop"
);
assert_eq!(
lm_handle.calls(),
3,
"decompose + (garbage) decide + synthesize"
);
let hops = answer.hops.expect("happy path stamps hops");
assert_eq!(hops.len(), 3, "[Decompose, Decide(parse-fail→stop), Synthesize]");
assert_eq!(hops[1].kind, HopKind::Decide);
assert!(
hops[1].sub_queries.is_empty(),
"parse failure → empty sub_queries (same shape as LLM stop)"
);
assert!(
!hops[1].forced_stop,
"parse-degraded decide is not a cap-driven forced_stop — \
flag stays false even though we synthesize early"
);
}