diff --git a/crates/kb-store-sqlite/src/documents.rs b/crates/kb-store-sqlite/src/documents.rs index 39cdeba..ed30d79 100644 --- a/crates/kb-store-sqlite/src/documents.rs +++ b/crates/kb-store-sqlite/src/documents.rs @@ -16,10 +16,14 @@ use rusqlite::params; use time::OffsetDateTime; use crate::error::StoreError; -use crate::store::{SqliteStore, upsert_asset_row}; +use crate::store::{SqliteStore, upsert_asset_row, validate_asset_id}; impl kb_core::DocumentStore for SqliteStore { fn put_asset(&self, asset: &kb_core::RawAsset) -> Result<()> { + // Validate the AssetId shape before any row work — defense in + // depth against hand-constructed `kb_core::AssetId` values that + // bypass `FromStr`. See `validate_asset_id` for rationale. + validate_asset_id(&asset.asset_id)?; // No bytes here — read storage_kind/storage_path from the // RawAsset's `stored` field per its convention (§3.3). Callers // that have raw bytes go through `put_asset_with_bytes` instead; @@ -33,12 +37,12 @@ impl kb_core::DocumentStore for SqliteStore { ("reference", path.to_string_lossy().into_owned()) } }; - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); upsert_asset_row(&conn, asset, storage_kind, &storage_path) } fn put_document(&self, doc: &kb_core::CanonicalDocument) -> Result<()> { - let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let mut conn = self.lock_conn(); let tx = conn.transaction().map_err(StoreError::from)?; upsert_document(&tx, doc)?; replace_document_tags(&tx, &doc.doc_id, &doc.metadata.tags)?; @@ -51,7 +55,7 @@ impl kb_core::DocumentStore for SqliteStore { doc: &kb_core::DocumentId, blocks: &[kb_core::Block], ) -> Result<()> { - let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let mut conn = self.lock_conn(); let tx = conn.transaction().map_err(StoreError::from)?; // DELETE-then-INSERT: §5.4 has no UNIQUE on (doc_id, ordinal) // so we cannot rely on UPSERT to surface block_id collisions. The @@ -97,7 +101,7 @@ impl kb_core::DocumentStore for SqliteStore { let now = OffsetDateTime::now_utc() .format(&time::format_description::well_known::Rfc3339) .context("format chunk created_at")?; - let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let mut conn = self.lock_conn(); let tx = conn.transaction().map_err(StoreError::from)?; tx.execute("DELETE FROM chunks WHERE doc_id = ?", params![doc.0]) .map_err(StoreError::from)?; @@ -144,7 +148,7 @@ impl kb_core::DocumentStore for SqliteStore { &self, id: &kb_core::DocumentId, ) -> Result> { - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); let row: Option = conn .query_row( "SELECT @@ -205,7 +209,7 @@ impl kb_core::DocumentStore for SqliteStore { } fn get_chunk(&self, id: &kb_core::ChunkId) -> Result> { - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); let row = conn .query_row( "SELECT @@ -248,7 +252,7 @@ impl kb_core::DocumentStore for SqliteStore { // Build a dynamic WHERE clause from the filter. Each condition // appends one positional `?` placeholder and one `Box` to `params` so order stays in sync. - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); let mut sql = String::from( "SELECT d.doc_id, d.workspace_path, d.title, d.lang, d.source_type, d.trust_level, d.parser_version, diff --git a/crates/kb-store-sqlite/src/jobs.rs b/crates/kb-store-sqlite/src/jobs.rs index 89d71dd..4493822 100644 --- a/crates/kb-store-sqlite/src/jobs.rs +++ b/crates/kb-store-sqlite/src/jobs.rs @@ -29,7 +29,7 @@ impl kb_core::JobRepo for SqliteStore { let kind_label = job_kind_label(&kind); let payload_json = serde_json::to_string(&payload) .context("serialize job payload")?; - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); conn.execute( "INSERT INTO jobs ( job_id, kind, status, payload_json, progress_json, @@ -51,7 +51,7 @@ impl kb_core::JobRepo for SqliteStore { let now = OffsetDateTime::now_utc() .format(&time::format_description::well_known::Rfc3339) .context("format job updated_at")?; - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); // status='pending' → 'running' on first progress update; later // progress calls keep status='running' until finish(). conn.execute( @@ -80,7 +80,7 @@ impl kb_core::JobRepo for SqliteStore { .map(|e| serde_json::to_string(&serde_json::json!({ "message": e }))) .transpose() .context("serialize job error")?; - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); conn.execute( "UPDATE jobs SET status = ?, @@ -98,7 +98,7 @@ impl kb_core::JobRepo for SqliteStore { &self, filter: &kb_core::JobFilter, ) -> Result> { - let conn = self.conn.lock().expect("sqlite mutex poisoned"); + let conn = self.lock_conn(); let mut sql = String::from( "SELECT job_id, kind, status, payload_json, progress_json, error_json, created_at, updated_at, finished_at diff --git a/crates/kb-store-sqlite/src/store.rs b/crates/kb-store-sqlite/src/store.rs index 72a605e..92d9837 100644 --- a/crates/kb-store-sqlite/src/store.rs +++ b/crates/kb-store-sqlite/src/store.rs @@ -7,7 +7,8 @@ //! mutex on the hot path. use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Mutex, MutexGuard}; use anyhow::{Context, Result}; use rusqlite::Connection; @@ -15,6 +16,15 @@ use rusqlite::Connection; use crate::error::StoreError; use crate::schema; +/// Monotonic counter used to namespace per-process temp file names so +/// concurrent `put_asset_with_bytes` calls in the same millisecond cannot +/// collide on `.tmp..`. +static TEMP_SUFFIX_COUNTER: AtomicU64 = AtomicU64::new(0); + +/// Length, in hex chars, of a valid `kb_core::AssetId`. blake3 first-half +/// truncated, mirrored from `kb-core`'s newtype invariant. +const ASSET_ID_HEX_LEN: usize = 32; + /// Default file name under `config.storage.data_dir`. Kept private — the /// path layout is a §6.3 design decision, not part of the store's public /// surface. @@ -80,7 +90,7 @@ impl SqliteStore { /// Apply all pending migrations bundled at compile time /// (`migrations/V001__init.sql` and any later additions). pub fn run_migrations(&self) -> Result<()> { - let mut conn = self.conn.lock().expect("sqlite mutex poisoned"); + let mut conn = self.lock_conn(); schema::runner() .run(&mut *conn) .map_err(|e| StoreError::Migration(e.to_string()))?; @@ -88,6 +98,17 @@ impl SqliteStore { Ok(()) } + /// Acquire the connection mutex, recovering from poison. + /// + /// Poisoning here means a previous holder panicked while holding the + /// guard. The active rusqlite transaction (if any) was rolled back by + /// the `Transaction` `Drop` impl, so the connection state is still + /// safe to reuse — we simply unwrap the inner guard rather than + /// propagate the panic to every subsequent call. + pub(crate) fn lock_conn(&self) -> MutexGuard<'_, Connection> { + self.conn.lock().unwrap_or_else(|p| p.into_inner()) + } + /// Persist a `RawAsset` *with its raw bytes*: row goes into `assets`, /// bytes go to `data_dir/assets//` if `byte_len ≤ /// copy_threshold_mb`, otherwise the row records the source URI's @@ -101,6 +122,13 @@ impl SqliteStore { asset: &kb_core::RawAsset, bytes: &[u8], ) -> Result<()> { + // 0. Validate the AssetId shape before any I/O. `kb_core::AssetId` + // is a `pub String` newtype: `FromStr` enforces the 32-hex-char + // invariant, but a hand-constructed `AssetId("../etc/passwd…")` + // can bypass that and reach `assets_path_for`. Refuse such IDs at + // the store boundary to keep shard-dir slicing safe. + validate_asset_id(&asset.asset_id)?; + // 1. Verify the caller's checksum matches what's actually on the // wire. A drift here means the bytes the parser saw and the bytes // we're about to durably store disagree — refuse persistence. @@ -116,54 +144,103 @@ impl SqliteStore { // 2. Decide copy vs. reference. The threshold compares the // declared `byte_len` (caller-vouched) rather than `bytes.len()` // because some sources stream and `byte_len` is authoritative. - let (storage_kind, storage_path) = if asset.byte_len <= self.copy_threshold_bytes { + if asset.byte_len <= self.copy_threshold_bytes { + // Copy mode. To prevent file orphans on UPSERT failure we use + // the temp-file + atomic-rename pattern: + // (a) write bytes to `.tmp..` + // (b) fsync the temp file + // (c) UPSERT the row + // (d) on UPSERT success: rename temp → final (atomic on + // same fs) + // (e) on any failure between (a) and (d): best-effort delete + // of the temp file so we never leak bytes on disk. let dest = self.assets_path_for(&asset.asset_id); if let Some(parent) = dest.parent() { std::fs::create_dir_all(parent).with_context(|| { format!("create asset shard dir {}", parent.display()) })?; } - std::fs::write(&dest, bytes) - .with_context(|| format!("write asset bytes to {}", dest.display()))?; - // Mirror §6.6: files 0o644. - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - let mut perms = std::fs::metadata(&dest)?.permissions(); - perms.set_mode(0o644); - std::fs::set_permissions(&dest, perms).with_context(|| { - format!("chmod 0o644 on {}", dest.display()) + let temp_path = temp_path_for(&dest); + // Inline closure so any `?` in (a)/(b) cleans up the temp + // file before bubbling out. + let write_and_upsert = || -> Result<()> { + { + let mut f = std::fs::File::create(&temp_path).with_context(|| { + format!("create temp asset file {}", temp_path.display()) + })?; + use std::io::Write; + f.write_all(bytes).with_context(|| { + format!("write asset bytes to {}", temp_path.display()) + })?; + f.sync_all().with_context(|| { + format!("fsync temp asset file {}", temp_path.display()) + })?; + } + // Mirror §6.6: files 0o644. + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let mut perms = std::fs::metadata(&temp_path)?.permissions(); + perms.set_mode(0o644); + std::fs::set_permissions(&temp_path, perms).with_context(|| { + format!("chmod 0o644 on {}", temp_path.display()) + })?; + } + // UPSERT the row first; only after a successful row write + // do we publish the file via rename. A second + // `put_asset_with_bytes` for the same asset_id overwrites + // in place. + { + let conn = self.lock_conn(); + upsert_asset_row( + &conn, + asset, + "copied", + &dest.to_string_lossy(), + )?; + } + std::fs::rename(&temp_path, &dest).with_context(|| { + format!( + "atomic rename {} -> {}", + temp_path.display(), + dest.display() + ) })?; + Ok(()) + }; + match write_and_upsert() { + Ok(()) => Ok(()), + Err(e) => { + // Best-effort cleanup; ignore errors so the original + // failure (likely the more useful one) propagates. + let _ = std::fs::remove_file(&temp_path); + Err(e) + } } - ("copied", dest.to_string_lossy().into_owned()) } else { // Reference: caller's source path is recorded verbatim. We // accept either a `File(path)` or `Kb(uri)` SourceUri; the - // latter stores the raw `kb://...` string. - let path = match &asset.source_uri { + // latter stores the raw `kb://...` string. No file I/O ⇒ no + // orphan risk; just UPSERT the row. + let storage_path = match &asset.source_uri { kb_core::SourceUri::File(p) => p.to_string_lossy().into_owned(), kb_core::SourceUri::Kb(u) => u.clone(), }; - ("reference", path) - }; - - // 3. UPSERT the assets row. A second `put_asset_with_bytes` for - // the same asset_id (e.g. re-ingest) overwrites in place — the - // row is uniquely keyed by asset_id and re-derived from the - // RawAsset every time. - let conn = self.conn.lock().expect("sqlite mutex poisoned"); - upsert_asset_row(&conn, asset, storage_kind, &storage_path)?; - Ok(()) + let conn = self.lock_conn(); + upsert_asset_row(&conn, asset, "reference", &storage_path)?; + Ok(()) + } } /// Compute the `data_dir/assets//` path for an asset. /// `` is the first [`ASSET_SHARD_LEN`] hex chars of `asset_id`. + /// + /// Callers that build paths from caller-controlled IDs MUST first + /// invoke [`validate_asset_id`] (already enforced at every store + /// entry that takes a `RawAsset`). The `id.len() >= ASSET_SHARD_LEN` + /// guard below is a defense-in-depth fallback only. pub(crate) fn assets_path_for(&self, asset_id: &kb_core::AssetId) -> PathBuf { let id = &asset_id.0; - // Defensive: kb-core enforces 32 hex chars on AssetId construction - // (`FromStr` validates). If a future code path bypasses that, we - // fall back to the full id as the shard so we never panic on - // slicing. let shard = if id.len() >= ASSET_SHARD_LEN { &id[..ASSET_SHARD_LEN] } else { @@ -173,6 +250,38 @@ impl SqliteStore { } } +/// Reject an `AssetId` whose shape would let a malicious caller escape +/// the `data_dir/assets//` shard tree. `kb_core::AssetId(pub String)` +/// permits hand-construction, so any function that turns an `AssetId` +/// into a filesystem path must call this first. +pub(crate) fn validate_asset_id(asset_id: &kb_core::AssetId) -> Result<()> { + if asset_id.0.len() != ASSET_ID_HEX_LEN + || !asset_id.0.bytes().all(|b| b.is_ascii_hexdigit()) + { + anyhow::bail!( + "invalid AssetId shape (expected {} ASCII hex chars): {:?}", + ASSET_ID_HEX_LEN, + asset_id.0 + ); + } + Ok(()) +} + +/// Compute a per-call temp-file path next to `dest` that is unlikely to +/// collide with any other in-flight writer (process pid + monotonic +/// counter). The temp file lives in the same parent directory so the +/// final `rename` is an atomic same-filesystem rename on POSIX. +fn temp_path_for(dest: &Path) -> PathBuf { + let pid = std::process::id(); + let n = TEMP_SUFFIX_COUNTER.fetch_add(1, Ordering::Relaxed); + let parent = dest.parent().unwrap_or_else(|| Path::new(".")); + let file_name = dest + .file_name() + .map(|s| s.to_string_lossy().into_owned()) + .unwrap_or_else(|| "asset".to_string()); + parent.join(format!("{file_name}.tmp.{pid}.{n}")) +} + /// UPSERT a row into `assets`. Used by both the `put_asset_with_bytes` /// path (which has bytes + computed `storage_kind/path`) and the /// `DocumentStore::put_asset` path (which only has the `RawAsset` and diff --git a/crates/kb-store-sqlite/tests/asset_writer.rs b/crates/kb-store-sqlite/tests/asset_writer.rs index 7aa5bfa..43112a7 100644 --- a/crates/kb-store-sqlite/tests/asset_writer.rs +++ b/crates/kb-store-sqlite/tests/asset_writer.rs @@ -99,6 +99,120 @@ fn reference_mode_does_not_write_file_but_records_path() { assert_eq!(storage_path, "/path/to/original.md"); } +#[test] +fn put_asset_with_bytes_orphan_cleanup_on_upsert_failure() { + // Goal: prove that if the row UPSERT fails AFTER the bytes have been + // staged on disk, no `/` file is left behind. + // + // Lever: the `assets` table has a UNIQUE INDEX on `workspace_path` + // (V001), but the UPSERT is `ON CONFLICT(asset_id)`. So if some other + // row already owns this `workspace_path`, the INSERT half of the + // UPSERT trips a UNIQUE constraint that the ON CONFLICT clause does + // NOT handle — UPSERT errors. The new asset's bytes were already + // staged; we assert they are NOT visible at the final destination. + let env = common::TestEnv::with_threshold(100); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + // Pre-populate a row that owns `notes/foo.md` (the workspace_path our + // fixture asset will also claim) under a *different* asset_id. + env.with_conn(|c| { + c.execute( + "INSERT INTO assets ( + asset_id, source_uri, workspace_path, media_type, byte_len, + checksum, storage_kind, storage_path, discovered_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + rusqlite::params![ + "b".repeat(32), + "file:///elsewhere/foo.md", + "notes/foo.md", + "\"markdown\"", + 7_i64, + "0".repeat(64), + "reference", + "/elsewhere/foo.md", + "2024-01-01T00:00:00Z", + ], + ) + }); + + let bytes = b"hello, sqlite"; + let cs = b3_full_hex(bytes); + let asset = fixed_asset(bytes, bytes.len() as u64, &cs); + + let err = store + .put_asset_with_bytes(&asset, bytes) + .expect_err("UPSERT must fail on workspace_path UNIQUE violation"); + let msg = format!("{err:#}"); + assert!( + msg.to_lowercase().contains("unique") || msg.to_lowercase().contains("constraint"), + "expected UNIQUE constraint failure, got: {msg}" + ); + + // Final destination must NOT exist (no orphan). + let aa = &asset.asset_id.0[..2]; + let dest = env.data_dir().join("assets").join(aa).join(&asset.asset_id.0); + assert!( + !dest.exists(), + "asset bytes were left orphan at {} after UPSERT failure", + dest.display() + ); + // No `*.tmp.*` either — temp file must be cleaned up too. + let shard_dir = env.data_dir().join("assets").join(aa); + if let Ok(entries) = std::fs::read_dir(&shard_dir) { + for entry in entries.flatten() { + let name = entry.file_name(); + let s = name.to_string_lossy(); + assert!( + !s.contains(".tmp."), + "temp file leaked at {}", + entry.path().display() + ); + } + } +} + +#[test] +fn put_asset_with_bytes_rejects_invalid_asset_id() { + // `kb_core::AssetId(pub String)` lets a hand-construction bypass the + // 32-hex `FromStr` invariant. The store boundary must reject any ID + // whose shape would let path construction escape `data_dir/assets/`. + let env = common::TestEnv::with_threshold(100); + let store = SqliteStore::open(&env.config()).unwrap(); + store.run_migrations().unwrap(); + + // 32 chars but contains a `/` — would let `assets_path_for` stitch + // together a path outside the shard tree. + let evil_id = "../etc/passwd_padded_to_xx_xxxxx".to_string(); + assert_eq!(evil_id.len(), 32, "test fixture must be 32 chars to exercise length-only checks"); + let mut asset = fixed_asset(b"x", 1, &b3_full_hex(b"x")); + asset.asset_id = AssetId(evil_id.clone()); + + let err = store + .put_asset_with_bytes(&asset, b"x") + .expect_err("must reject non-hex AssetId"); + let msg = format!("{err:#}"); + assert!( + msg.contains("invalid AssetId shape"), + "expected AssetId-shape rejection, got: {msg}" + ); + + // And the bytes must NOT have been staged anywhere under the assets + // tree (no I/O should have happened before validation). + let assets_dir = env.data_dir().join("assets"); + if assets_dir.exists() { + for entry in std::fs::read_dir(&assets_dir).unwrap().flatten() { + // Recurse one level into shard dirs and assert empty. + if let Some(sub) = std::fs::read_dir(entry.path()).unwrap().flatten().next() { + panic!( + "invalid AssetId still produced filesystem artifact at {}", + sub.path().display() + ); + } + } + } +} + #[test] fn checksum_mismatch_returns_conflict() { let env = common::TestEnv::new();