feat(cli): kebab search --bulk flag + stdin ndjson + output stream (fb-42)

This commit is contained in:
th-kim0823
2026-05-10 20:22:45 +09:00
parent 1ebbd6b711
commit 67f2c16cc2
2 changed files with 104 additions and 0 deletions

View File

@@ -171,6 +171,16 @@ enum Cmd {
/// without embeddings via a no-op vector stub.
#[arg(long)]
trace: bool,
/// p9-fb-42: bulk multi-query mode. Reads ndjson from stdin —
/// one JSON object per line, each item shape mirrors the
/// single-query input. Output is per-query ndjson on stdout
/// (one `bulk_search_item.v1` per line) plus a summary line on
/// stderr. Single-query flags (`--mode`, `--k`, `--tag`, etc.)
/// are ignored when `--bulk` is set; pass them per-item in the
/// stdin JSON instead. Caps at 100 queries per call.
#[arg(long)]
bulk: bool,
},
/// Retrieval-augmented question answering.
@@ -678,7 +688,87 @@ fn run(cli: &Cli) -> anyhow::Result<()> {
ingested_after,
doc_id,
trace,
bulk,
} => {
// p9-fb-42: bulk mode — stdin ndjson → bulk_search_with_config
// → stdout ndjson per query + stderr summary. Single-query
// flags are ignored (each item supplies its own).
if *bulk {
use std::io::{BufRead, Write};
let cfg = kebab_config::Config::load(cli.config.as_deref())?;
let stdin = std::io::stdin();
let stdin_locked = stdin.lock();
let mut raw_items: Vec<serde_json::Value> = Vec::new();
for (lineno, line) in stdin_locked.lines().enumerate() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let v: serde_json::Value =
serde_json::from_str(&line).map_err(|e| {
anyhow::Error::new(kebab_app::StructuredError(
kebab_app::ErrorV1 {
schema_version: kebab_app::ERROR_V1_ID
.to_string(),
code: "config_invalid".to_string(),
message: format!(
"stdin ndjson line {} parse error: {e}",
lineno + 1
),
details: serde_json::Value::Null,
hint: Some(
"each line must be a JSON object with at least `query`"
.to_string(),
),
},
))
})?;
raw_items.push(v);
}
let (items, summary) =
kebab_app::bulk_search_with_config(cfg, raw_items)?;
if cli.json {
let mut stdout = std::io::stdout().lock();
for item in &items {
let v = wire::wire_bulk_search_item(item);
writeln!(stdout, "{}", serde_json::to_string(&v)?)?;
}
eprintln!(
"bulk_summary: total={} succeeded={} failed={}",
summary.total, summary.succeeded, summary.failed,
);
} else {
let mut stdout = std::io::stdout().lock();
for (idx, item) in items.iter().enumerate() {
writeln!(
stdout,
"# Query {}: {}",
idx + 1,
serde_json::to_string(&item.query)?,
)?;
if let Some(err) = &item.error {
writeln!(stdout, "error: {}", err)?;
} else if let Some(resp) = &item.response {
writeln!(
stdout,
"{}",
serde_json::to_string_pretty(resp)?
)?;
}
writeln!(stdout)?;
}
eprintln!(
"bulk_summary: total={} succeeded={} failed={}",
summary.total, summary.succeeded, summary.failed,
);
}
return Ok(());
}
let cfg = kebab_config::Config::load(cli.config.as_deref())?;
// p9-fb-36: normalize --media aliases (md → markdown).

View File

@@ -201,6 +201,20 @@ pub fn wire_fetch_result(r: &kebab_core::FetchResult) -> Value {
tag_object(v, "fetch_result.v1")
}
/// p9-fb-42: tag a `BulkSearchItem` (already serialized as a Value)
/// as `bulk_search_item.v1`. The inner `query` / `response` / `error`
/// fields stay verbatim — only the envelope gets the schema_version stamp.
pub fn wire_bulk_search_item(item: &kebab_core::BulkSearchItem) -> Value {
let mut v = serde_json::to_value(item).expect("BulkSearchItem serializes");
if let Value::Object(ref mut map) = v {
map.insert(
"schema_version".to_string(),
Value::String("bulk_search_item.v1".to_string()),
);
}
v
}
#[cfg(test)]
mod tests {
use super::*;