From c6cc1e2bfef6ac327143df3eeaef160060aa2838 Mon Sep 17 00:00:00 2001 From: th-kim0823 Date: Sun, 10 May 2026 03:50:56 +0900 Subject: [PATCH] feat(search/vector): media / ingested_after / doc_id filters (fb-36) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit filter_chunks helper in kebab-store-sqlite extended with the same 3 WHERE clauses as lexical. Vector still over-fetches k*2 then post-filters via SqliteStore::filter_chunks; small k can return < k hits when filters drop a lot — agent is expected to widen k or paginate. AND combinator with existing filters. - kebab-store-sqlite/src/filters.rs: media IN-list subquery, ingested_after lexicographic >= compare, doc_id equality; mirrors lexical SQL arms - 3 direct unit tests (filter_chunks_media_type/ingested_after/doc_id) that run without AVX/Lance - common/mod.rs: insert_doc / insert_doc_with_media / run_vector_search helpers on HybridEnv for integration-test use - hybrid.rs: 2 new #[ignore = "requires AVX..."] integration tests (vector_filter_by_media, vector_filter_by_doc_id) Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/kebab-search/tests/common/mod.rs | 91 +++++++++- crates/kebab-search/tests/hybrid.rs | 53 +++++- crates/kebab-store-sqlite/src/filters.rs | 217 +++++++++++++++++++++++ 3 files changed, 359 insertions(+), 2 deletions(-) diff --git a/crates/kebab-search/tests/common/mod.rs b/crates/kebab-search/tests/common/mod.rs index 69b87bd..d0ae1ad 100644 --- a/crates/kebab-search/tests/common/mod.rs +++ b/crates/kebab-search/tests/common/mod.rs @@ -19,7 +19,9 @@ use std::sync::Arc; use kebab_config::Config; use kebab_core::{ ChunkId, DocumentId, EmbeddingId, EmbeddingInput, EmbeddingKind, - EmbeddingModelId, EmbeddingVersion, IndexVersion, VectorRecord, VectorStore, + EmbeddingModelId, EmbeddingVersion, IndexVersion, MediaType, + Retriever, SearchFilters, SearchHit, SearchMode, SearchQuery, + VectorRecord, VectorStore, }; use kebab_embed::{Embedder, MockEmbedder}; use kebab_search::{LexicalRetriever, VectorRetriever}; @@ -173,6 +175,93 @@ impl HybridEnv { .unwrap(); } + /// High-level helper: seed a doc with the default media type + /// (Markdown) and embed its text. Returns the `DocumentId` so + /// callers can use it in `doc_id` filter tests. + pub fn insert_doc(&self, path: &str, text: &str) -> DocumentId { + self.insert_doc_with_media(path, text, MediaType::Markdown) + } + + /// High-level helper: seed a doc with an explicit `MediaType`. + /// The `media_type` is serialized to JSON (mirrors how + /// `DocumentStore::put_document` writes it) and stored in `assets`. + pub fn insert_doc_with_media( + &self, + path: &str, + text: &str, + media: MediaType, + ) -> DocumentId { + // Derive deterministic IDs from the path so repeated calls with + // the same path are idempotent (INSERT OR IGNORE). + let path_hash: String = { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut h = DefaultHasher::new(); + path.hash(&mut h); + format!("{:032x}", h.finish()) + }; + let doc_id = format!("d{}", &path_hash[..31]); + let chunk_id = format!("c{}", &path_hash[..31]); + let asset_id = format!("a{}", &path_hash[..31]); + + let media_json = serde_json::to_string(&media).expect("serialize MediaType"); + let conn = self.sqlite.read_conn(); + conn.execute( + "INSERT OR IGNORE INTO assets ( + asset_id, source_uri, workspace_path, media_type, byte_len, + checksum, storage_kind, storage_path, discovered_at + ) VALUES (?, ?, ?, ?, 0, + 'deadbeefdeadbeefdeadbeefdeadbeef', + 'reference', ?, '1970-01-01T00:00:00Z')", + params![ + asset_id, + format!("file:///{path}"), + path, + media_json, + path, + ], + ) + .unwrap(); + conn.execute( + "INSERT OR IGNORE INTO documents ( + doc_id, asset_id, workspace_path, title, lang, source_type, + trust_level, parser_version, doc_version, schema_version, + metadata_json, provenance_json, created_at, updated_at + ) VALUES (?, ?, ?, NULL, 'en', 'markdown', 'primary', 'v1', 1, 1, + '{}', '{}', '1970-01-01T00:00:00Z', '1970-01-01T00:00:00Z')", + params![doc_id, asset_id, path], + ) + .unwrap(); + let heading_json = "[]"; + conn.execute( + "INSERT OR IGNORE INTO chunks ( + chunk_id, doc_id, text, heading_path_json, section_label, + source_spans_json, token_estimate, chunker_version, + policy_hash, block_ids_json, created_at + ) VALUES (?, ?, ?, ?, NULL, + '[{\"kind\":\"line\",\"start\":1,\"end\":1}]', + 1, 'v1', 'h', '[]', '1970-01-01T00:00:00Z')", + params![chunk_id, doc_id, text, heading_json], + ) + .unwrap(); + drop(conn); + self.embed_and_upsert(&chunk_id, &doc_id, text, &[]); + DocumentId(doc_id) + } + + /// Run a `SearchMode::Vector` query against the seeded corpus and + /// return the resulting `Vec`. + pub fn run_vector_search(&self, query: &str, filters: &SearchFilters) -> Vec { + let r = self.vector_retriever(); + let q = SearchQuery { + text: query.to_string(), + mode: SearchMode::Vector, + k: 10, + filters: filters.clone(), + }; + r.search(&q).expect("vector search") + } + /// Embed `text` as a Document and upsert it as the embedding for /// `chunk_id`. Drives the same code path production uses: /// MockEmbedder → VectorRecord → LanceVectorStore::upsert → diff --git a/crates/kebab-search/tests/hybrid.rs b/crates/kebab-search/tests/hybrid.rs index 13f945d..912422a 100644 --- a/crates/kebab-search/tests/hybrid.rs +++ b/crates/kebab-search/tests/hybrid.rs @@ -15,7 +15,7 @@ use common::{ HybridEnv, id32, require_avx_or_panic, TEST_LEX_INDEX_VERSION, TEST_VEC_INDEX_VERSION, }; use kebab_core::{ - Retriever, SearchFilters, SearchHit, SearchMode, SearchQuery, + MediaType, Retriever, SearchFilters, SearchHit, SearchMode, SearchQuery, }; use kebab_search::{FusionPolicy, HybridRetriever}; use rusqlite::params; @@ -213,6 +213,57 @@ fn hybrid_snapshot_run_1() { } } +/// p9-fb-36: vector post-filter must pass `media` through `filter_chunks`. +/// Seeding two docs (markdown + pdf) and filtering for pdf-only must +/// return only the pdf chunk, proving `LanceVectorStore::search` → +/// `SqliteStore::filter_chunks` correctly applies the media arm. +#[test] +#[ignore = "requires AVX-capable hardware (LanceDB)"] +fn vector_filter_by_media() { + require_avx_or_panic(); + let env = HybridEnv::new(); + env.insert_doc_with_media("md1.md", "rust ownership", MediaType::Markdown); + env.insert_doc_with_media("doc.pdf", "rust pdf body", MediaType::Pdf); + + let filters = SearchFilters { + media: vec!["pdf".to_string()], + ..Default::default() + }; + let hits = env.run_vector_search("rust", &filters); + assert_eq!(hits.len(), 1, "media filter must keep only pdf chunk"); + assert!( + hits[0].doc_path.0.ends_with(".pdf"), + "expected .pdf path, got: {}", + hits[0].doc_path.0 + ); +} + +/// p9-fb-36: vector post-filter must pass `doc_id` through `filter_chunks`. +/// Seeding two docs with shared text, filtering by one doc_id must return +/// only chunks from that doc. +#[test] +#[ignore = "requires AVX-capable hardware (LanceDB)"] +fn vector_filter_by_doc_id() { + require_avx_or_panic(); + let env = HybridEnv::new(); + let target = env.insert_doc("a.md", "shared knowledge"); + env.insert_doc("b.md", "shared knowledge"); + + let filters = SearchFilters { + doc_id: Some(target.clone()), + ..Default::default() + }; + let hits = env.run_vector_search("shared", &filters); + assert!( + !hits.is_empty(), + "doc_id filter must return hits for the target doc" + ); + assert!( + hits.iter().all(|h| h.doc_id == target), + "all hits must belong to the target doc_id" + ); +} + #[test] #[ignore = "requires AVX-capable hardware (LanceDB)"] fn vector_hit_carries_indexed_at() { diff --git a/crates/kebab-store-sqlite/src/filters.rs b/crates/kebab-store-sqlite/src/filters.rs index 2b1ff00..4586236 100644 --- a/crates/kebab-store-sqlite/src/filters.rs +++ b/crates/kebab-store-sqlite/src/filters.rs @@ -129,6 +129,47 @@ impl SqliteStore { } } + // p9-fb-36: media_type filter (IN-list). + // `assets.media_type` JSON has two shapes: + // - unit variant (Markdown / Pdf / …): JSON text, e.g. `"markdown"` + // - tuple variant (Image(Png) / Audio(Mp3) / Other(s)): JSON object, + // e.g. `{"image": "png"}` + // Extract a unified "kind" string for both shapes; mirrors lexical. + if !filters.media.is_empty() { + let media_ph = std::iter::repeat_n("?", filters.media.len()) + .collect::>() + .join(","); + sql.push_str(&format!( + " AND d.doc_id IN (\ + SELECT d2.doc_id FROM documents d2 \ + JOIN assets a ON a.asset_id = d2.asset_id \ + WHERE CASE \ + WHEN json_type(a.media_type) = 'text' THEN json_extract(a.media_type, '$') \ + ELSE (SELECT key FROM json_each(a.media_type) LIMIT 1) \ + END IN ({media_ph}))" + )); + for kind in &filters.media { + bind.push(Box::new(kind.clone())); + } + } + + // p9-fb-36: ingested_after filter. + // `documents.updated_at` is RFC3339 TEXT (UTC `Z` per fb-32); + // lexicographic >= compare is correct. + if let Some(after) = &filters.ingested_after { + let formatted = after + .format(&time::format_description::well_known::Rfc3339) + .expect("OffsetDateTime formats to RFC3339"); + sql.push_str(" AND d.updated_at >= ?"); + bind.push(Box::new(formatted)); + } + + // p9-fb-36: doc_id filter — single-doc scoping. + if let Some(id) = &filters.doc_id { + sql.push_str(" AND d.doc_id = ?"); + bind.push(Box::new(id.0.clone())); + } + // Optional path_glob: applied in Rust on the rows we get back, // not in SQL — matching `kb-search::lexical`'s post-filter so // the glob semantics are byte-identical between retrievers. @@ -280,6 +321,89 @@ mod tests { .unwrap(); } + /// Variant of `seed_committed` that accepts an explicit `media_type` + /// JSON string (e.g. `r#""markdown""#` or `r#""pdf""#`) and an + /// explicit `updated_at` RFC3339 string so the fb-36 filter tests can + /// exercise `media` and `ingested_after` without going through the full + /// ingest pipeline. + #[allow(clippy::too_many_arguments)] + fn seed_committed_full( + store: &SqliteStore, + chunk_id: &str, + doc_id: &str, + workspace_path: &str, + lang: &str, + tags: &[&str], + trust: &str, + media_type_json: &str, + updated_at: &str, + ) { + let asset_id = format!("a{}", &doc_id[..31]); + { + let conn = store.lock_conn(); + conn.execute( + "INSERT INTO assets ( + asset_id, source_uri, workspace_path, media_type, byte_len, + checksum, storage_kind, storage_path, discovered_at + ) VALUES (?, ?, ?, ?, 0, 'deadbeefdeadbeefdeadbeefdeadbeef', + 'reference', ?, '1970-01-01T00:00:00Z')", + params![ + asset_id, + format!("file://{workspace_path}"), + workspace_path, + media_type_json, + workspace_path, + ], + ) + .unwrap(); + conn.execute( + "INSERT INTO documents ( + doc_id, asset_id, workspace_path, title, lang, source_type, + trust_level, parser_version, doc_version, schema_version, + metadata_json, provenance_json, created_at, updated_at + ) VALUES (?, ?, ?, NULL, ?, 'markdown', ?, 'v1', 1, 1, + '{}', '{}', '1970-01-01T00:00:00Z', ?)", + params![doc_id, asset_id, workspace_path, lang, trust, updated_at], + ) + .unwrap(); + for t in tags { + conn.execute( + "INSERT INTO document_tags (doc_id, tag) VALUES (?, ?)", + params![doc_id, t], + ) + .unwrap(); + } + conn.execute( + "INSERT INTO chunks ( + chunk_id, doc_id, text, heading_path_json, section_label, + source_spans_json, token_estimate, chunker_version, + policy_hash, block_ids_json, created_at + ) VALUES (?, ?, 'hi', '[]', NULL, '[]', 1, 'v1', 'h', '[]', + '1970-01-01T00:00:00Z')", + params![chunk_id, doc_id], + ) + .unwrap(); + } + + let embed_row = EmbeddingRecordRow { + embedding_id: format!("e{}", &chunk_id[..31]), + chunk_id: chunk_id.to_string(), + model_id: "m".to_string(), + model_version: "v1".to_string(), + dimensions: 4, + lance_table: "t".to_string(), + created_at: OffsetDateTime::UNIX_EPOCH, + }; + store + .put_embedding_records_pending(std::slice::from_ref(&embed_row)) + .unwrap(); + store + .mark_embedding_records_committed(std::slice::from_ref( + &embed_row.embedding_id, + )) + .unwrap(); + } + fn cid(s: &str) -> ChunkId { ChunkId(s.to_string()) } @@ -449,4 +573,97 @@ mod tests { let out = store.filter_chunks(&[], &SearchFilters::default()).unwrap(); assert!(out.is_empty()); } + + // ── p9-fb-36 new filter arms ───────────────────────────────────────── + + #[test] + fn filter_chunks_media_type_keeps_matching_kind() { + // c1 = markdown, c2 = pdf. Filter for pdf → only c2 survives. + let tmp = TempDir::new().unwrap(); + let store = open_store(&tmp); + let c1 = "11111111111111111111111111111111"; + let c2 = "22222222222222222222222222222222"; + seed_committed_full( + &store, c1, "d1d1d1d1d1d1d1d1d1d1d1d1d1d1d1d1", + "notes/a.md", "en", &[], "primary", + r#""markdown""#, + "1970-01-01T00:00:00Z", + ); + seed_committed_full( + &store, c2, "d2d2d2d2d2d2d2d2d2d2d2d2d2d2d2d2", + "notes/b.pdf", "en", &[], "primary", + r#""pdf""#, + "1970-01-01T00:00:00Z", + ); + + let f = SearchFilters { + media: vec!["pdf".to_string()], + ..Default::default() + }; + let out = store + .filter_chunks(&[cid(c1), cid(c2)], &f) + .unwrap(); + assert_eq!(out, vec![cid(c2)], "only pdf chunk should survive media filter"); + } + + #[test] + fn filter_chunks_ingested_after_excludes_old_docs() { + // c1 ingested 2020, c2 ingested 2026. filter ingested_after=2025 → only c2. + let tmp = TempDir::new().unwrap(); + let store = open_store(&tmp); + let c1 = "11111111111111111111111111111111"; + let c2 = "22222222222222222222222222222222"; + seed_committed_full( + &store, c1, "d1d1d1d1d1d1d1d1d1d1d1d1d1d1d1d1", + "old.md", "en", &[], "primary", + r#""markdown""#, + "2020-01-01T00:00:00Z", + ); + seed_committed_full( + &store, c2, "d2d2d2d2d2d2d2d2d2d2d2d2d2d2d2d2", + "new.md", "en", &[], "primary", + r#""markdown""#, + "2026-01-01T00:00:00Z", + ); + + let f = SearchFilters { + ingested_after: Some(time::macros::datetime!(2025-01-01 00:00:00 UTC)), + ..Default::default() + }; + let out = store + .filter_chunks(&[cid(c1), cid(c2)], &f) + .unwrap(); + assert_eq!(out, vec![cid(c2)], "only post-2025 chunk should survive ingested_after filter"); + } + + #[test] + fn filter_chunks_doc_id_scopes_to_single_doc() { + // c1 belongs to d1, c2 belongs to d2. filter doc_id=d1 → only c1. + let tmp = TempDir::new().unwrap(); + let store = open_store(&tmp); + let c1 = "11111111111111111111111111111111"; + let c2 = "22222222222222222222222222222222"; + let d1 = "d1d1d1d1d1d1d1d1d1d1d1d1d1d1d1d1"; + seed_committed_full( + &store, c1, d1, + "a.md", "en", &[], "primary", + r#""markdown""#, + "1970-01-01T00:00:00Z", + ); + seed_committed_full( + &store, c2, "d2d2d2d2d2d2d2d2d2d2d2d2d2d2d2d2", + "b.md", "en", &[], "primary", + r#""markdown""#, + "1970-01-01T00:00:00Z", + ); + + let f = SearchFilters { + doc_id: Some(kebab_core::DocumentId(d1.to_string())), + ..Default::default() + }; + let out = store + .filter_chunks(&[cid(c1), cid(c2)], &f) + .unwrap(); + assert_eq!(out, vec![cid(c1)], "doc_id filter must scope to the target doc only"); + } }