Files
kebab/crates/kebab-llm-local/tests/streaming.rs
altair823 911fb49550 refactor(rename): kb crates → kebab — Cargo packages, folders, Rust modules
프로젝트 이름 `kb` → `kebab` rename 의 첫 단계.

- workspace `Cargo.toml`: members `crates/kb-*` → `crates/kebab-*`,
  repository URL `altair823/kb` → `altair823/kebab`.
- 18 crate 폴더 rename via `git mv` (history 보존).
- 각 crate `Cargo.toml`: `name = "kb-*"` → `"kebab-*"`, path deps
  `../kb-*` → `../kebab-*`.
- 모든 `.rs`: `kb_<id>` snake-case 모듈 path 18 개 (`kb_core`,
  `kb_config`, `kb_app`, `kb_cli`, `kb_eval`, `kb_search`, `kb_chunk`,
  `kb_normalize`, `kb_source_fs`, `kb_parse_md`, `kb_parse_types`,
  `kb_store_sqlite`, `kb_store_vector`, `kb_embed`, `kb_embed_local`,
  `kb_llm`, `kb_llm_local`, `kb_rag`) → `kebab_<id>` 일괄 sed (단어
  경계 \\b 사용해 영어 문장 안의 "kb" 약어 미오염).

CLI binary 이름 (`[[bin]] name = "kb"`), 환경변수 `KB_*`, XDG paths,
tracing target, 그리고 docs sweep 은 다음 commit 에서.

## 검증

- `cargo check --workspace` clean — 모든 crate 빌드 통과 후 commit.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 03:28:08 +00:00

473 lines
19 KiB
Rust

//! End-to-end streaming tests against a `wiremock`-hosted mock HTTP server.
//!
//! `wiremock` is async, so the test functions are `#[tokio::test]`; the
//! adapter under test stays sync and is called from `spawn_blocking` to
//! preserve the "no async runtime in the runtime crate" invariant. Tokio
//! is a `dev-dependency` only — `cargo tree -p kb-llm-local --edges no-dev`
//! must not list it.
//!
//! Each test pins one behavior from design §7.2 / §11.2: streaming order,
//! error mapping, finish-reason mapping, missing-counter degradation, and
//! determinism semantics.
use kebab_config::Config;
use kebab_core::{FinishReason, GenerateRequest, TokenChunk};
use kebab_llm_local::{LanguageModel, LlmError, OllamaLanguageModel};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
/// Build a `Config` whose `models.llm.endpoint` points at the wiremock
/// server. Other fields are left at their `Config::defaults()` values so
/// tests pin the same `model` id the production code will use.
fn cfg_for_endpoint(endpoint: &str) -> Config {
let mut cfg = Config::defaults();
cfg.models.llm.endpoint = endpoint.to_string();
// Keep model id stable for the ModelNotPulled test below.
cfg.models.llm.model = "qwen2.5:7b-instruct".to_string();
cfg
}
fn sample_request() -> GenerateRequest {
GenerateRequest {
system: "you are a test".to_string(),
user: "hello".to_string(),
stop: vec![],
max_tokens: 64,
temperature: 0.0,
seed: Some(0),
}
}
/// Helper: drive `generate_stream` to completion on a blocking thread so
/// the sync `OllamaLanguageModel` stays off the async runtime.
async fn collect_chunks(
cfg: Config,
req: GenerateRequest,
) -> anyhow::Result<Vec<TokenChunk>> {
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<TokenChunk>> {
let llm = OllamaLanguageModel::new(&cfg)?;
let stream = llm.generate_stream(req)?;
stream.collect::<Result<Vec<_>, _>>()
})
.await
.expect("blocking task panicked")
}
/// Same as `collect_chunks`, but returns the boxed `anyhow::Error` from
/// `generate_stream` itself (rather than a stream-mid error). Used by the
/// "unreachable endpoint" / "model not pulled" tests where the error
/// surfaces on `.send()` before any chunks flow.
async fn run_expecting_request_error(
cfg: Config,
req: GenerateRequest,
) -> anyhow::Error {
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
let llm = OllamaLanguageModel::new(&cfg)?;
let _stream = llm.generate_stream(req)?;
Ok(())
})
.await
.expect("blocking task panicked")
.expect_err("expected generate_stream / new to return Err")
}
// ── Happy path ────────────────────────────────────────────────────────────
#[tokio::test]
async fn streamed_response_produces_tokens_then_done() {
let server = MockServer::start().await;
let body = concat!(
r#"{"response":"hi","done":false}"#, "\n",
r#"{"response":" there","done":false}"#, "\n",
r#"{"response":"","done":true,"done_reason":"stop","prompt_eval_count":3,"eval_count":2,"total_duration":1500000}"#, "\n",
);
Mock::given(method("POST"))
.and(path("/api/generate"))
.respond_with(ResponseTemplate::new(200).set_body_string(body))
.mount(&server)
.await;
let chunks = collect_chunks(cfg_for_endpoint(&server.uri()), sample_request())
.await
.expect("stream should complete");
assert_eq!(chunks.len(), 3, "expected 2 tokens + 1 done");
assert!(matches!(&chunks[0], TokenChunk::Token(t) if t == "hi"));
assert!(matches!(&chunks[1], TokenChunk::Token(t) if t == " there"));
match &chunks[2] {
TokenChunk::Done { finish_reason, usage } => {
assert!(matches!(finish_reason, FinishReason::Stop));
assert_eq!(usage.prompt_tokens, 3);
assert_eq!(usage.completion_tokens, 2);
// 1_500_000 ns / 1_000_000 = 1 ms.
assert_eq!(usage.latency_ms, 1);
}
other => panic!("expected Done, got {other:?}"),
}
}
#[tokio::test]
async fn concat_of_streamed_tokens_equals_full_text() {
let server = MockServer::start().await;
let pieces = ["The ", "quick ", "brown ", "fox"];
let mut body = String::new();
for p in &pieces {
body.push_str(&format!(r#"{{"response":"{p}","done":false}}"#));
body.push('\n');
}
body.push_str(r#"{"response":"","done":true,"done_reason":"stop","prompt_eval_count":1,"eval_count":4,"total_duration":0}"#);
body.push('\n');
Mock::given(method("POST"))
.and(path("/api/generate"))
.respond_with(ResponseTemplate::new(200).set_body_string(body))
.mount(&server)
.await;
let chunks = collect_chunks(cfg_for_endpoint(&server.uri()), sample_request())
.await
.unwrap();
let joined: String = chunks
.iter()
.filter_map(|c| match c {
TokenChunk::Token(t) => Some(t.as_str()),
_ => None,
})
.collect();
assert_eq!(joined, "The quick brown fox");
}
// ── UTF-8 / Korean ────────────────────────────────────────────────────────
#[tokio::test]
async fn multibyte_chars_within_a_line_round_trip() {
// The "split across HTTP chunks" concern in the spec is about
// reqwest's transport-level chunk boundaries; for line-delimited
// JSON, the BufReader's `read_until(b'\n')` accumulates until newline
// regardless of HTTP chunk boundary, so the UTF-8 boundary issue is
// moot for *complete* lines. This test verifies that multi-byte
// payloads inside a single line round-trip correctly — covering the
// common case where a Korean / Japanese / emoji token spans 3+ bytes.
// (Test name is honest about scope: it does NOT exercise cross-HTTP
// -chunk reassembly — that's structurally infeasible to set up given
// the line-delimited framing.)
let server = MockServer::start().await;
let body = concat!(
// "한국어" (Korean) — each char is 3 bytes in UTF-8.
r#"{"response":"한국어","done":false}"#, "\n",
// Followed by an emoji ZWJ sequence (4 bytes per scalar).
r#"{"response":"🦀","done":false}"#, "\n",
r#"{"response":"","done":true,"done_reason":"stop","prompt_eval_count":1,"eval_count":4,"total_duration":0}"#, "\n",
);
Mock::given(method("POST"))
.and(path("/api/generate"))
.respond_with(ResponseTemplate::new(200).set_body_string(body))
.mount(&server)
.await;
let chunks = collect_chunks(cfg_for_endpoint(&server.uri()), sample_request())
.await
.unwrap();
let joined: String = chunks
.iter()
.filter_map(|c| match c {
TokenChunk::Token(t) => Some(t.as_str()),
_ => None,
})
.collect();
assert_eq!(joined, "한국어🦀");
}
// ── Error mapping ─────────────────────────────────────────────────────────
#[tokio::test]
async fn unreachable_endpoint_maps_to_unreachable_error() {
// Port 1 is reserved (tcpmux) and almost never bound on a dev box —
// a synchronous `connect` returns ECONNREFUSED immediately, which
// reqwest reports as `is_connect()`.
let mut cfg = Config::defaults();
cfg.models.llm.endpoint = "http://127.0.0.1:1".to_string();
let err = run_expecting_request_error(cfg, sample_request()).await;
let llm_err = err
.downcast_ref::<LlmError>()
.unwrap_or_else(|| panic!("expected LlmError, got: {err:?}"));
match llm_err {
LlmError::Unreachable { endpoint, .. } => {
assert_eq!(endpoint, "http://127.0.0.1:1");
}
other => panic!("expected LlmError::Unreachable, got {other:?}"),
}
// The Display string MUST carry the actionable hint per §10.
let rendered = format!("{err}");
assert!(
rendered.contains("ollama serve"),
"missing actionable hint in: {rendered}"
);
}
#[tokio::test]
async fn model_not_found_maps_to_model_not_pulled() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/generate"))
.respond_with(ResponseTemplate::new(404).set_body_string(
r#"{"error":"model 'qwen2.5:7b-instruct' not found, try pulling it first"}"#,
))
.mount(&server)
.await;
let err = run_expecting_request_error(cfg_for_endpoint(&server.uri()), sample_request()).await;
let llm_err = err
.downcast_ref::<LlmError>()
.unwrap_or_else(|| panic!("expected LlmError, got: {err:?}"));
match llm_err {
LlmError::ModelNotPulled(model) => assert_eq!(model, "qwen2.5:7b-instruct"),
other => panic!("expected LlmError::ModelNotPulled, got {other:?}"),
}
let rendered = format!("{err}");
assert!(
rendered.contains("ollama pull qwen2.5:7b-instruct"),
"missing pull hint in: {rendered}"
);
}
#[tokio::test]
async fn other_4xx_maps_to_stream_error() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/generate"))
.respond_with(ResponseTemplate::new(400).set_body_string("bad request"))
.mount(&server)
.await;
let err = run_expecting_request_error(cfg_for_endpoint(&server.uri()), sample_request()).await;
let llm_err = err.downcast_ref::<LlmError>().expect("expected LlmError");
assert!(
matches!(llm_err, LlmError::Stream(_)),
"expected Stream variant, got {llm_err:?}"
);
}
// ── Finish-reason mapping ─────────────────────────────────────────────────
#[tokio::test]
async fn done_reason_length_maps_to_finish_reason_length() {
let server = MockServer::start().await;
let body = concat!(
r#"{"response":"a","done":false}"#, "\n",
r#"{"response":"","done":true,"done_reason":"length","prompt_eval_count":1,"eval_count":1,"total_duration":0}"#, "\n",
);
Mock::given(method("POST"))
.and(path("/api/generate"))
.respond_with(ResponseTemplate::new(200).set_body_string(body))
.mount(&server)
.await;
let chunks = collect_chunks(cfg_for_endpoint(&server.uri()), sample_request())
.await
.unwrap();
match chunks.last().unwrap() {
TokenChunk::Done { finish_reason, .. } => {
assert!(matches!(finish_reason, FinishReason::Length));
}
other => panic!("expected Done, got {other:?}"),
}
}
#[tokio::test]
async fn done_reason_abort_maps_to_finish_reason_aborted() {
let server = MockServer::start().await;
let body = concat!(
r#"{"response":"a","done":false}"#, "\n",
r#"{"response":"","done":true,"done_reason":"abort","prompt_eval_count":1,"eval_count":1,"total_duration":0}"#, "\n",
);
Mock::given(method("POST"))
.and(path("/api/generate"))
.respond_with(ResponseTemplate::new(200).set_body_string(body))
.mount(&server)
.await;
let chunks = collect_chunks(cfg_for_endpoint(&server.uri()), sample_request())
.await
.unwrap();
match chunks.last().unwrap() {
TokenChunk::Done { finish_reason, .. } => {
assert!(matches!(finish_reason, FinishReason::Aborted));
}
other => panic!("expected Done, got {other:?}"),
}
}
// ── Resilience ────────────────────────────────────────────────────────────
#[tokio::test]
async fn missing_eval_counts_default_to_zero() {
// Older Ollama (< ~0.1.40) sometimes omitted prompt_eval_count /
// eval_count entirely. Per §10 we degrade gracefully + warn rather
// than failing the stream. Test asserts the zero default; the warn
// is observed only via tracing-subscriber, which we do not wire up
// here — the comment documents the intent.
let server = MockServer::start().await;
let body = concat!(
r#"{"response":"hi","done":false}"#, "\n",
// No prompt_eval_count / eval_count / total_duration.
r#"{"response":"","done":true,"done_reason":"stop"}"#, "\n",
);
Mock::given(method("POST"))
.and(path("/api/generate"))
.respond_with(ResponseTemplate::new(200).set_body_string(body))
.mount(&server)
.await;
let chunks = collect_chunks(cfg_for_endpoint(&server.uri()), sample_request())
.await
.unwrap();
match chunks.last().unwrap() {
TokenChunk::Done { usage, .. } => {
assert_eq!(usage.prompt_tokens, 0);
assert_eq!(usage.completion_tokens, 0);
assert_eq!(usage.latency_ms, 0);
}
other => panic!("expected Done, got {other:?}"),
}
}
#[tokio::test]
async fn missing_done_reason_defaults_to_stop() {
let server = MockServer::start().await;
let body = concat!(
r#"{"response":"hi","done":false}"#, "\n",
// Final frame omits done_reason entirely.
r#"{"response":"","done":true,"prompt_eval_count":1,"eval_count":1,"total_duration":0}"#, "\n",
);
Mock::given(method("POST"))
.and(path("/api/generate"))
.respond_with(ResponseTemplate::new(200).set_body_string(body))
.mount(&server)
.await;
let chunks = collect_chunks(cfg_for_endpoint(&server.uri()), sample_request())
.await
.unwrap();
match chunks.last().unwrap() {
TokenChunk::Done { finish_reason, .. } => {
assert!(matches!(finish_reason, FinishReason::Stop));
}
other => panic!("expected Done, got {other:?}"),
}
}
// ── Non-NDJSON 200 body ───────────────────────────────────────────────────
#[tokio::test]
async fn non_ndjson_200_body_maps_to_stream_not_malformed() {
// Misrouted reverse proxy returning a 200 with an HTML error page is
// the canonical case: status code says "ok", body is nowhere near
// NDJSON. Per §10 taxonomy the first-line parse failure on such a
// response surfaces as `LlmError::Stream`, not `Malformed`
// ("Malformed" is reserved for mid-stream corruption after at least
// one valid NDJSON line).
let server = MockServer::start().await;
let html = "<html><body><h1>500 Internal Server Error</h1></body></html>";
Mock::given(method("POST"))
.and(path("/api/generate"))
.respond_with(ResponseTemplate::new(200).set_body_string(html))
.mount(&server)
.await;
let chunks = collect_chunks(cfg_for_endpoint(&server.uri()), sample_request()).await;
let err = chunks.expect_err("expected the iterator to surface an error");
let llm_err = err
.downcast_ref::<LlmError>()
.unwrap_or_else(|| panic!("expected LlmError, got: {err:?}"));
assert!(
matches!(llm_err, LlmError::Stream(_)),
"first-line non-NDJSON should be Stream, got {llm_err:?}",
);
}
// ── Endpoint URL handling ─────────────────────────────────────────────────
#[tokio::test]
async fn endpoint_with_trailing_slash_does_not_double_slash() {
// The adapter does `format!("{}/api/generate", endpoint.trim_end_matches('/'))`,
// so an endpoint configured with a trailing slash must still resolve
// to a single-slash URL. Two layers of evidence:
// 1. The wiremock matcher `path("/api/generate")` would NOT match a
// request to `//api/generate`, so a successful response itself
// proves the URL is correctly normalized.
// 2. We additionally inspect `MockServer::received_requests()` and
// assert the recorded `Request::url` path is exactly
// `/api/generate` — pinning the invariant explicitly so a future
// regression that "works" via a different mismatch would still
// fail the assertion.
let server = MockServer::start().await;
let body = concat!(
r#"{"response":"ok","done":false}"#, "\n",
r#"{"response":"","done":true,"done_reason":"stop","prompt_eval_count":1,"eval_count":1,"total_duration":0}"#, "\n",
);
Mock::given(method("POST"))
.and(path("/api/generate"))
.respond_with(ResponseTemplate::new(200).set_body_string(body))
.mount(&server)
.await;
// Append the trailing slash to the wiremock URI.
let endpoint_with_slash = format!("{}/", server.uri());
let cfg = cfg_for_endpoint(&endpoint_with_slash);
let chunks = collect_chunks(cfg, sample_request())
.await
.expect("stream should complete despite trailing slash on endpoint");
// Smoke-check: we got the canned tokens — proves matcher (1) above.
assert!(matches!(&chunks[0], TokenChunk::Token(t) if t == "ok"));
// Evidence (2): inspect the recorded request URL.
let recorded = server
.received_requests()
.await
.expect("wiremock should record requests by default");
assert_eq!(recorded.len(), 1, "expected exactly one request");
let url = &recorded[0].url;
assert_eq!(
url.path(),
"/api/generate",
"request path should be exactly /api/generate (single slash), got {url}",
);
}
// ── Determinism ───────────────────────────────────────────────────────────
#[tokio::test]
async fn determinism_seed_zero_temp_zero_two_runs_identical() {
// Determinism test against a *mock* — wiremock just replays the canned
// response so byte-equality is trivially satisfied. The point of the
// test is to lock in the request shape: when `temperature == 0` and a
// fixed seed are sent, we expect identical client-observed output.
// Real-Ollama determinism is asserted in `tests/integration.rs`
// (#[ignore]) where reproducibility is modulo model-internal nondet.
let server = MockServer::start().await;
let body = concat!(
r#"{"response":"deterministic","done":false}"#, "\n",
r#"{"response":"","done":true,"done_reason":"stop","prompt_eval_count":1,"eval_count":1,"total_duration":0}"#, "\n",
);
Mock::given(method("POST"))
.and(path("/api/generate"))
.respond_with(ResponseTemplate::new(200).set_body_string(body))
// expect 2 calls so wiremock does not reset between them
.expect(2)
.mount(&server)
.await;
let cfg = cfg_for_endpoint(&server.uri());
let req1 = sample_request();
let req2 = sample_request();
let chunks_a = collect_chunks(cfg.clone(), req1).await.unwrap();
let chunks_b = collect_chunks(cfg, req2).await.unwrap();
assert_eq!(chunks_a, chunks_b);
}