feat(search/vector): media / ingested_after / doc_id filters (fb-36)

filter_chunks helper in kebab-store-sqlite extended with the same 3
WHERE clauses as lexical. Vector still over-fetches k*2 then
post-filters via SqliteStore::filter_chunks; small k can return < k
hits when filters drop a lot — agent is expected to widen k or
paginate. AND combinator with existing filters.

- kebab-store-sqlite/src/filters.rs: media IN-list subquery, ingested_after
  lexicographic >= compare, doc_id equality; mirrors lexical SQL arms
- 3 direct unit tests (filter_chunks_media_type/ingested_after/doc_id)
  that run without AVX/Lance
- common/mod.rs: insert_doc / insert_doc_with_media / run_vector_search
  helpers on HybridEnv for integration-test use
- hybrid.rs: 2 new #[ignore = "requires AVX..."] integration tests
  (vector_filter_by_media, vector_filter_by_doc_id)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
th-kim0823
2026-05-10 03:50:56 +09:00
parent 86475e5ba2
commit c6cc1e2bfe
3 changed files with 359 additions and 2 deletions

View File

@@ -129,6 +129,47 @@ impl SqliteStore {
}
}
// p9-fb-36: media_type filter (IN-list).
// `assets.media_type` JSON has two shapes:
// - unit variant (Markdown / Pdf / …): JSON text, e.g. `"markdown"`
// - tuple variant (Image(Png) / Audio(Mp3) / Other(s)): JSON object,
// e.g. `{"image": "png"}`
// Extract a unified "kind" string for both shapes; mirrors lexical.
if !filters.media.is_empty() {
let media_ph = std::iter::repeat_n("?", filters.media.len())
.collect::<Vec<_>>()
.join(",");
sql.push_str(&format!(
" AND d.doc_id IN (\
SELECT d2.doc_id FROM documents d2 \
JOIN assets a ON a.asset_id = d2.asset_id \
WHERE CASE \
WHEN json_type(a.media_type) = 'text' THEN json_extract(a.media_type, '$') \
ELSE (SELECT key FROM json_each(a.media_type) LIMIT 1) \
END IN ({media_ph}))"
));
for kind in &filters.media {
bind.push(Box::new(kind.clone()));
}
}
// p9-fb-36: ingested_after filter.
// `documents.updated_at` is RFC3339 TEXT (UTC `Z` per fb-32);
// lexicographic >= compare is correct.
if let Some(after) = &filters.ingested_after {
let formatted = after
.format(&time::format_description::well_known::Rfc3339)
.expect("OffsetDateTime formats to RFC3339");
sql.push_str(" AND d.updated_at >= ?");
bind.push(Box::new(formatted));
}
// p9-fb-36: doc_id filter — single-doc scoping.
if let Some(id) = &filters.doc_id {
sql.push_str(" AND d.doc_id = ?");
bind.push(Box::new(id.0.clone()));
}
// Optional path_glob: applied in Rust on the rows we get back,
// not in SQL — matching `kb-search::lexical`'s post-filter so
// the glob semantics are byte-identical between retrievers.
@@ -280,6 +321,89 @@ mod tests {
.unwrap();
}
/// Variant of `seed_committed` that accepts an explicit `media_type`
/// JSON string (e.g. `r#""markdown""#` or `r#""pdf""#`) and an
/// explicit `updated_at` RFC3339 string so the fb-36 filter tests can
/// exercise `media` and `ingested_after` without going through the full
/// ingest pipeline.
#[allow(clippy::too_many_arguments)]
fn seed_committed_full(
store: &SqliteStore,
chunk_id: &str,
doc_id: &str,
workspace_path: &str,
lang: &str,
tags: &[&str],
trust: &str,
media_type_json: &str,
updated_at: &str,
) {
let asset_id = format!("a{}", &doc_id[..31]);
{
let conn = store.lock_conn();
conn.execute(
"INSERT INTO assets (
asset_id, source_uri, workspace_path, media_type, byte_len,
checksum, storage_kind, storage_path, discovered_at
) VALUES (?, ?, ?, ?, 0, 'deadbeefdeadbeefdeadbeefdeadbeef',
'reference', ?, '1970-01-01T00:00:00Z')",
params![
asset_id,
format!("file://{workspace_path}"),
workspace_path,
media_type_json,
workspace_path,
],
)
.unwrap();
conn.execute(
"INSERT INTO documents (
doc_id, asset_id, workspace_path, title, lang, source_type,
trust_level, parser_version, doc_version, schema_version,
metadata_json, provenance_json, created_at, updated_at
) VALUES (?, ?, ?, NULL, ?, 'markdown', ?, 'v1', 1, 1,
'{}', '{}', '1970-01-01T00:00:00Z', ?)",
params![doc_id, asset_id, workspace_path, lang, trust, updated_at],
)
.unwrap();
for t in tags {
conn.execute(
"INSERT INTO document_tags (doc_id, tag) VALUES (?, ?)",
params![doc_id, t],
)
.unwrap();
}
conn.execute(
"INSERT INTO chunks (
chunk_id, doc_id, text, heading_path_json, section_label,
source_spans_json, token_estimate, chunker_version,
policy_hash, block_ids_json, created_at
) VALUES (?, ?, 'hi', '[]', NULL, '[]', 1, 'v1', 'h', '[]',
'1970-01-01T00:00:00Z')",
params![chunk_id, doc_id],
)
.unwrap();
}
let embed_row = EmbeddingRecordRow {
embedding_id: format!("e{}", &chunk_id[..31]),
chunk_id: chunk_id.to_string(),
model_id: "m".to_string(),
model_version: "v1".to_string(),
dimensions: 4,
lance_table: "t".to_string(),
created_at: OffsetDateTime::UNIX_EPOCH,
};
store
.put_embedding_records_pending(std::slice::from_ref(&embed_row))
.unwrap();
store
.mark_embedding_records_committed(std::slice::from_ref(
&embed_row.embedding_id,
))
.unwrap();
}
fn cid(s: &str) -> ChunkId {
ChunkId(s.to_string())
}
@@ -449,4 +573,97 @@ mod tests {
let out = store.filter_chunks(&[], &SearchFilters::default()).unwrap();
assert!(out.is_empty());
}
// ── p9-fb-36 new filter arms ─────────────────────────────────────────
#[test]
fn filter_chunks_media_type_keeps_matching_kind() {
// c1 = markdown, c2 = pdf. Filter for pdf → only c2 survives.
let tmp = TempDir::new().unwrap();
let store = open_store(&tmp);
let c1 = "11111111111111111111111111111111";
let c2 = "22222222222222222222222222222222";
seed_committed_full(
&store, c1, "d1d1d1d1d1d1d1d1d1d1d1d1d1d1d1d1",
"notes/a.md", "en", &[], "primary",
r#""markdown""#,
"1970-01-01T00:00:00Z",
);
seed_committed_full(
&store, c2, "d2d2d2d2d2d2d2d2d2d2d2d2d2d2d2d2",
"notes/b.pdf", "en", &[], "primary",
r#""pdf""#,
"1970-01-01T00:00:00Z",
);
let f = SearchFilters {
media: vec!["pdf".to_string()],
..Default::default()
};
let out = store
.filter_chunks(&[cid(c1), cid(c2)], &f)
.unwrap();
assert_eq!(out, vec![cid(c2)], "only pdf chunk should survive media filter");
}
#[test]
fn filter_chunks_ingested_after_excludes_old_docs() {
// c1 ingested 2020, c2 ingested 2026. filter ingested_after=2025 → only c2.
let tmp = TempDir::new().unwrap();
let store = open_store(&tmp);
let c1 = "11111111111111111111111111111111";
let c2 = "22222222222222222222222222222222";
seed_committed_full(
&store, c1, "d1d1d1d1d1d1d1d1d1d1d1d1d1d1d1d1",
"old.md", "en", &[], "primary",
r#""markdown""#,
"2020-01-01T00:00:00Z",
);
seed_committed_full(
&store, c2, "d2d2d2d2d2d2d2d2d2d2d2d2d2d2d2d2",
"new.md", "en", &[], "primary",
r#""markdown""#,
"2026-01-01T00:00:00Z",
);
let f = SearchFilters {
ingested_after: Some(time::macros::datetime!(2025-01-01 00:00:00 UTC)),
..Default::default()
};
let out = store
.filter_chunks(&[cid(c1), cid(c2)], &f)
.unwrap();
assert_eq!(out, vec![cid(c2)], "only post-2025 chunk should survive ingested_after filter");
}
#[test]
fn filter_chunks_doc_id_scopes_to_single_doc() {
// c1 belongs to d1, c2 belongs to d2. filter doc_id=d1 → only c1.
let tmp = TempDir::new().unwrap();
let store = open_store(&tmp);
let c1 = "11111111111111111111111111111111";
let c2 = "22222222222222222222222222222222";
let d1 = "d1d1d1d1d1d1d1d1d1d1d1d1d1d1d1d1";
seed_committed_full(
&store, c1, d1,
"a.md", "en", &[], "primary",
r#""markdown""#,
"1970-01-01T00:00:00Z",
);
seed_committed_full(
&store, c2, "d2d2d2d2d2d2d2d2d2d2d2d2d2d2d2d2",
"b.md", "en", &[], "primary",
r#""markdown""#,
"1970-01-01T00:00:00Z",
);
let f = SearchFilters {
doc_id: Some(kebab_core::DocumentId(d1.to_string())),
..Default::default()
};
let out = store
.filter_chunks(&[cid(c1), cid(c2)], &f)
.unwrap();
assert_eq!(out, vec![cid(c1)], "doc_id filter must scope to the target doc only");
}
}