p1-6: harden store boundary (atomic asset write, poison-tolerant mutex, AssetId validation)

Three Important review fixes on the kb-store-sqlite write path:

I1. Atomic asset write. put_asset_with_bytes now stages bytes to
    `<final>.tmp.<pid>.<n>`, fsyncs, UPSERTs the row, then `rename`s into
    place (atomic on POSIX same-fs). On any failure between staging and
    rename we best-effort `remove_file` the temp so the previous orphan
    risk on UPSERT failure is gone. Reference mode is unchanged (no I/O,
    no orphan risk).

I2. Poison-tolerant mutex. New `lock_conn` helper does
    `.lock().unwrap_or_else(|p| p.into_inner())`, so a single panic mid-
    transaction no longer poisons every subsequent store call. The
    rusqlite Transaction Drop already rolls back on panic, leaving the
    Connection state safe to reuse. All 13 prior `.expect("sqlite mutex
    poisoned")` sites in store.rs / documents.rs / jobs.rs now route
    through `lock_conn`.

I3. AssetId shape validation. `kb_core::AssetId(pub String)` lets a
    hand-construction bypass the `FromStr` 32-hex invariant. Added
    `validate_asset_id` (32 ASCII hex chars) at every store entry that
    turns an AssetId into a path: `put_asset_with_bytes` and
    `DocumentStore::put_asset`. This shuts a potential path-traversal via
    `assets_path_for`'s `&id[..2]` shard slice.

Tests:
- `put_asset_with_bytes_orphan_cleanup_on_upsert_failure` — pre-seeds a
  row that takes the same `workspace_path` (UNIQUE), so the UPSERT trips
  a constraint not covered by `ON CONFLICT(asset_id)`. Asserts no final
  file and no leaked `*.tmp.*`.
- `put_asset_with_bytes_rejects_invalid_asset_id` — passes
  `AssetId("../etc/passwd_padded_to_xx_xxxxx")` (32 chars, contains `/`).
  Asserts error and zero filesystem artifacts under `data_dir/assets/`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-30 17:33:19 +00:00
parent efdb71b1c3
commit e41279de96
4 changed files with 269 additions and 42 deletions

View File

@@ -16,10 +16,14 @@ use rusqlite::params;
use time::OffsetDateTime;
use crate::error::StoreError;
use crate::store::{SqliteStore, upsert_asset_row};
use crate::store::{SqliteStore, upsert_asset_row, validate_asset_id};
impl kb_core::DocumentStore for SqliteStore {
fn put_asset(&self, asset: &kb_core::RawAsset) -> Result<()> {
// Validate the AssetId shape before any row work — defense in
// depth against hand-constructed `kb_core::AssetId` values that
// bypass `FromStr`. See `validate_asset_id` for rationale.
validate_asset_id(&asset.asset_id)?;
// No bytes here — read storage_kind/storage_path from the
// RawAsset's `stored` field per its convention (§3.3). Callers
// that have raw bytes go through `put_asset_with_bytes` instead;
@@ -33,12 +37,12 @@ impl kb_core::DocumentStore for SqliteStore {
("reference", path.to_string_lossy().into_owned())
}
};
let conn = self.conn.lock().expect("sqlite mutex poisoned");
let conn = self.lock_conn();
upsert_asset_row(&conn, asset, storage_kind, &storage_path)
}
fn put_document(&self, doc: &kb_core::CanonicalDocument) -> Result<()> {
let mut conn = self.conn.lock().expect("sqlite mutex poisoned");
let mut conn = self.lock_conn();
let tx = conn.transaction().map_err(StoreError::from)?;
upsert_document(&tx, doc)?;
replace_document_tags(&tx, &doc.doc_id, &doc.metadata.tags)?;
@@ -51,7 +55,7 @@ impl kb_core::DocumentStore for SqliteStore {
doc: &kb_core::DocumentId,
blocks: &[kb_core::Block],
) -> Result<()> {
let mut conn = self.conn.lock().expect("sqlite mutex poisoned");
let mut conn = self.lock_conn();
let tx = conn.transaction().map_err(StoreError::from)?;
// DELETE-then-INSERT: §5.4 has no UNIQUE on (doc_id, ordinal)
// so we cannot rely on UPSERT to surface block_id collisions. The
@@ -97,7 +101,7 @@ impl kb_core::DocumentStore for SqliteStore {
let now = OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.context("format chunk created_at")?;
let mut conn = self.conn.lock().expect("sqlite mutex poisoned");
let mut conn = self.lock_conn();
let tx = conn.transaction().map_err(StoreError::from)?;
tx.execute("DELETE FROM chunks WHERE doc_id = ?", params![doc.0])
.map_err(StoreError::from)?;
@@ -144,7 +148,7 @@ impl kb_core::DocumentStore for SqliteStore {
&self,
id: &kb_core::DocumentId,
) -> Result<Option<kb_core::CanonicalDocument>> {
let conn = self.conn.lock().expect("sqlite mutex poisoned");
let conn = self.lock_conn();
let row: Option<DocumentRow> = conn
.query_row(
"SELECT
@@ -205,7 +209,7 @@ impl kb_core::DocumentStore for SqliteStore {
}
fn get_chunk(&self, id: &kb_core::ChunkId) -> Result<Option<kb_core::Chunk>> {
let conn = self.conn.lock().expect("sqlite mutex poisoned");
let conn = self.lock_conn();
let row = conn
.query_row(
"SELECT
@@ -248,7 +252,7 @@ impl kb_core::DocumentStore for SqliteStore {
// Build a dynamic WHERE clause from the filter. Each condition
// appends one positional `?` placeholder and one `Box<dyn
// ToSql>` to `params` so order stays in sync.
let conn = self.conn.lock().expect("sqlite mutex poisoned");
let conn = self.lock_conn();
let mut sql = String::from(
"SELECT d.doc_id, d.workspace_path, d.title, d.lang,
d.source_type, d.trust_level, d.parser_version,

View File

@@ -29,7 +29,7 @@ impl kb_core::JobRepo for SqliteStore {
let kind_label = job_kind_label(&kind);
let payload_json = serde_json::to_string(&payload)
.context("serialize job payload")?;
let conn = self.conn.lock().expect("sqlite mutex poisoned");
let conn = self.lock_conn();
conn.execute(
"INSERT INTO jobs (
job_id, kind, status, payload_json, progress_json,
@@ -51,7 +51,7 @@ impl kb_core::JobRepo for SqliteStore {
let now = OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.context("format job updated_at")?;
let conn = self.conn.lock().expect("sqlite mutex poisoned");
let conn = self.lock_conn();
// status='pending' → 'running' on first progress update; later
// progress calls keep status='running' until finish().
conn.execute(
@@ -80,7 +80,7 @@ impl kb_core::JobRepo for SqliteStore {
.map(|e| serde_json::to_string(&serde_json::json!({ "message": e })))
.transpose()
.context("serialize job error")?;
let conn = self.conn.lock().expect("sqlite mutex poisoned");
let conn = self.lock_conn();
conn.execute(
"UPDATE jobs SET
status = ?,
@@ -98,7 +98,7 @@ impl kb_core::JobRepo for SqliteStore {
&self,
filter: &kb_core::JobFilter,
) -> Result<Vec<kb_core::JobRow>> {
let conn = self.conn.lock().expect("sqlite mutex poisoned");
let conn = self.lock_conn();
let mut sql = String::from(
"SELECT job_id, kind, status, payload_json, progress_json,
error_json, created_at, updated_at, finished_at

View File

@@ -7,7 +7,8 @@
//! mutex on the hot path.
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Mutex, MutexGuard};
use anyhow::{Context, Result};
use rusqlite::Connection;
@@ -15,6 +16,15 @@ use rusqlite::Connection;
use crate::error::StoreError;
use crate::schema;
/// Monotonic counter used to namespace per-process temp file names so
/// concurrent `put_asset_with_bytes` calls in the same millisecond cannot
/// collide on `<final>.tmp.<pid>.<n>`.
static TEMP_SUFFIX_COUNTER: AtomicU64 = AtomicU64::new(0);
/// Length, in hex chars, of a valid `kb_core::AssetId`. blake3 first-half
/// truncated, mirrored from `kb-core`'s newtype invariant.
const ASSET_ID_HEX_LEN: usize = 32;
/// Default file name under `config.storage.data_dir`. Kept private — the
/// path layout is a §6.3 design decision, not part of the store's public
/// surface.
@@ -80,7 +90,7 @@ impl SqliteStore {
/// Apply all pending migrations bundled at compile time
/// (`migrations/V001__init.sql` and any later additions).
pub fn run_migrations(&self) -> Result<()> {
let mut conn = self.conn.lock().expect("sqlite mutex poisoned");
let mut conn = self.lock_conn();
schema::runner()
.run(&mut *conn)
.map_err(|e| StoreError::Migration(e.to_string()))?;
@@ -88,6 +98,17 @@ impl SqliteStore {
Ok(())
}
/// Acquire the connection mutex, recovering from poison.
///
/// Poisoning here means a previous holder panicked while holding the
/// guard. The active rusqlite transaction (if any) was rolled back by
/// the `Transaction` `Drop` impl, so the connection state is still
/// safe to reuse — we simply unwrap the inner guard rather than
/// propagate the panic to every subsequent call.
pub(crate) fn lock_conn(&self) -> MutexGuard<'_, Connection> {
self.conn.lock().unwrap_or_else(|p| p.into_inner())
}
/// Persist a `RawAsset` *with its raw bytes*: row goes into `assets`,
/// bytes go to `data_dir/assets/<aa>/<asset_id>` if `byte_len ≤
/// copy_threshold_mb`, otherwise the row records the source URI's
@@ -101,6 +122,13 @@ impl SqliteStore {
asset: &kb_core::RawAsset,
bytes: &[u8],
) -> Result<()> {
// 0. Validate the AssetId shape before any I/O. `kb_core::AssetId`
// is a `pub String` newtype: `FromStr` enforces the 32-hex-char
// invariant, but a hand-constructed `AssetId("../etc/passwd…")`
// can bypass that and reach `assets_path_for`. Refuse such IDs at
// the store boundary to keep shard-dir slicing safe.
validate_asset_id(&asset.asset_id)?;
// 1. Verify the caller's checksum matches what's actually on the
// wire. A drift here means the bytes the parser saw and the bytes
// we're about to durably store disagree — refuse persistence.
@@ -116,54 +144,103 @@ impl SqliteStore {
// 2. Decide copy vs. reference. The threshold compares the
// declared `byte_len` (caller-vouched) rather than `bytes.len()`
// because some sources stream and `byte_len` is authoritative.
let (storage_kind, storage_path) = if asset.byte_len <= self.copy_threshold_bytes {
if asset.byte_len <= self.copy_threshold_bytes {
// Copy mode. To prevent file orphans on UPSERT failure we use
// the temp-file + atomic-rename pattern:
// (a) write bytes to `<final>.tmp.<pid>.<counter>`
// (b) fsync the temp file
// (c) UPSERT the row
// (d) on UPSERT success: rename temp → final (atomic on
// same fs)
// (e) on any failure between (a) and (d): best-effort delete
// of the temp file so we never leak bytes on disk.
let dest = self.assets_path_for(&asset.asset_id);
if let Some(parent) = dest.parent() {
std::fs::create_dir_all(parent).with_context(|| {
format!("create asset shard dir {}", parent.display())
})?;
}
std::fs::write(&dest, bytes)
.with_context(|| format!("write asset bytes to {}", dest.display()))?;
// Mirror §6.6: files 0o644.
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&dest)?.permissions();
perms.set_mode(0o644);
std::fs::set_permissions(&dest, perms).with_context(|| {
format!("chmod 0o644 on {}", dest.display())
let temp_path = temp_path_for(&dest);
// Inline closure so any `?` in (a)/(b) cleans up the temp
// file before bubbling out.
let write_and_upsert = || -> Result<()> {
{
let mut f = std::fs::File::create(&temp_path).with_context(|| {
format!("create temp asset file {}", temp_path.display())
})?;
use std::io::Write;
f.write_all(bytes).with_context(|| {
format!("write asset bytes to {}", temp_path.display())
})?;
f.sync_all().with_context(|| {
format!("fsync temp asset file {}", temp_path.display())
})?;
}
// Mirror §6.6: files 0o644.
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&temp_path)?.permissions();
perms.set_mode(0o644);
std::fs::set_permissions(&temp_path, perms).with_context(|| {
format!("chmod 0o644 on {}", temp_path.display())
})?;
}
// UPSERT the row first; only after a successful row write
// do we publish the file via rename. A second
// `put_asset_with_bytes` for the same asset_id overwrites
// in place.
{
let conn = self.lock_conn();
upsert_asset_row(
&conn,
asset,
"copied",
&dest.to_string_lossy(),
)?;
}
std::fs::rename(&temp_path, &dest).with_context(|| {
format!(
"atomic rename {} -> {}",
temp_path.display(),
dest.display()
)
})?;
Ok(())
};
match write_and_upsert() {
Ok(()) => Ok(()),
Err(e) => {
// Best-effort cleanup; ignore errors so the original
// failure (likely the more useful one) propagates.
let _ = std::fs::remove_file(&temp_path);
Err(e)
}
}
("copied", dest.to_string_lossy().into_owned())
} else {
// Reference: caller's source path is recorded verbatim. We
// accept either a `File(path)` or `Kb(uri)` SourceUri; the
// latter stores the raw `kb://...` string.
let path = match &asset.source_uri {
// latter stores the raw `kb://...` string. No file I/O ⇒ no
// orphan risk; just UPSERT the row.
let storage_path = match &asset.source_uri {
kb_core::SourceUri::File(p) => p.to_string_lossy().into_owned(),
kb_core::SourceUri::Kb(u) => u.clone(),
};
("reference", path)
};
// 3. UPSERT the assets row. A second `put_asset_with_bytes` for
// the same asset_id (e.g. re-ingest) overwrites in place — the
// row is uniquely keyed by asset_id and re-derived from the
// RawAsset every time.
let conn = self.conn.lock().expect("sqlite mutex poisoned");
upsert_asset_row(&conn, asset, storage_kind, &storage_path)?;
Ok(())
let conn = self.lock_conn();
upsert_asset_row(&conn, asset, "reference", &storage_path)?;
Ok(())
}
}
/// Compute the `data_dir/assets/<aa>/<asset_id>` path for an asset.
/// `<aa>` is the first [`ASSET_SHARD_LEN`] hex chars of `asset_id`.
///
/// Callers that build paths from caller-controlled IDs MUST first
/// invoke [`validate_asset_id`] (already enforced at every store
/// entry that takes a `RawAsset`). The `id.len() >= ASSET_SHARD_LEN`
/// guard below is a defense-in-depth fallback only.
pub(crate) fn assets_path_for(&self, asset_id: &kb_core::AssetId) -> PathBuf {
let id = &asset_id.0;
// Defensive: kb-core enforces 32 hex chars on AssetId construction
// (`FromStr` validates). If a future code path bypasses that, we
// fall back to the full id as the shard so we never panic on
// slicing.
let shard = if id.len() >= ASSET_SHARD_LEN {
&id[..ASSET_SHARD_LEN]
} else {
@@ -173,6 +250,38 @@ impl SqliteStore {
}
}
/// Reject an `AssetId` whose shape would let a malicious caller escape
/// the `data_dir/assets/<aa>/` shard tree. `kb_core::AssetId(pub String)`
/// permits hand-construction, so any function that turns an `AssetId`
/// into a filesystem path must call this first.
pub(crate) fn validate_asset_id(asset_id: &kb_core::AssetId) -> Result<()> {
if asset_id.0.len() != ASSET_ID_HEX_LEN
|| !asset_id.0.bytes().all(|b| b.is_ascii_hexdigit())
{
anyhow::bail!(
"invalid AssetId shape (expected {} ASCII hex chars): {:?}",
ASSET_ID_HEX_LEN,
asset_id.0
);
}
Ok(())
}
/// Compute a per-call temp-file path next to `dest` that is unlikely to
/// collide with any other in-flight writer (process pid + monotonic
/// counter). The temp file lives in the same parent directory so the
/// final `rename` is an atomic same-filesystem rename on POSIX.
fn temp_path_for(dest: &Path) -> PathBuf {
let pid = std::process::id();
let n = TEMP_SUFFIX_COUNTER.fetch_add(1, Ordering::Relaxed);
let parent = dest.parent().unwrap_or_else(|| Path::new("."));
let file_name = dest
.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|| "asset".to_string());
parent.join(format!("{file_name}.tmp.{pid}.{n}"))
}
/// UPSERT a row into `assets`. Used by both the `put_asset_with_bytes`
/// path (which has bytes + computed `storage_kind/path`) and the
/// `DocumentStore::put_asset` path (which only has the `RawAsset` and

View File

@@ -99,6 +99,120 @@ fn reference_mode_does_not_write_file_but_records_path() {
assert_eq!(storage_path, "/path/to/original.md");
}
#[test]
fn put_asset_with_bytes_orphan_cleanup_on_upsert_failure() {
// Goal: prove that if the row UPSERT fails AFTER the bytes have been
// staged on disk, no `<aa>/<asset_id>` file is left behind.
//
// Lever: the `assets` table has a UNIQUE INDEX on `workspace_path`
// (V001), but the UPSERT is `ON CONFLICT(asset_id)`. So if some other
// row already owns this `workspace_path`, the INSERT half of the
// UPSERT trips a UNIQUE constraint that the ON CONFLICT clause does
// NOT handle — UPSERT errors. The new asset's bytes were already
// staged; we assert they are NOT visible at the final destination.
let env = common::TestEnv::with_threshold(100);
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
// Pre-populate a row that owns `notes/foo.md` (the workspace_path our
// fixture asset will also claim) under a *different* asset_id.
env.with_conn(|c| {
c.execute(
"INSERT INTO assets (
asset_id, source_uri, workspace_path, media_type, byte_len,
checksum, storage_kind, storage_path, discovered_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
rusqlite::params![
"b".repeat(32),
"file:///elsewhere/foo.md",
"notes/foo.md",
"\"markdown\"",
7_i64,
"0".repeat(64),
"reference",
"/elsewhere/foo.md",
"2024-01-01T00:00:00Z",
],
)
});
let bytes = b"hello, sqlite";
let cs = b3_full_hex(bytes);
let asset = fixed_asset(bytes, bytes.len() as u64, &cs);
let err = store
.put_asset_with_bytes(&asset, bytes)
.expect_err("UPSERT must fail on workspace_path UNIQUE violation");
let msg = format!("{err:#}");
assert!(
msg.to_lowercase().contains("unique") || msg.to_lowercase().contains("constraint"),
"expected UNIQUE constraint failure, got: {msg}"
);
// Final destination must NOT exist (no orphan).
let aa = &asset.asset_id.0[..2];
let dest = env.data_dir().join("assets").join(aa).join(&asset.asset_id.0);
assert!(
!dest.exists(),
"asset bytes were left orphan at {} after UPSERT failure",
dest.display()
);
// No `*.tmp.*` either — temp file must be cleaned up too.
let shard_dir = env.data_dir().join("assets").join(aa);
if let Ok(entries) = std::fs::read_dir(&shard_dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let s = name.to_string_lossy();
assert!(
!s.contains(".tmp."),
"temp file leaked at {}",
entry.path().display()
);
}
}
}
#[test]
fn put_asset_with_bytes_rejects_invalid_asset_id() {
// `kb_core::AssetId(pub String)` lets a hand-construction bypass the
// 32-hex `FromStr` invariant. The store boundary must reject any ID
// whose shape would let path construction escape `data_dir/assets/`.
let env = common::TestEnv::with_threshold(100);
let store = SqliteStore::open(&env.config()).unwrap();
store.run_migrations().unwrap();
// 32 chars but contains a `/` — would let `assets_path_for` stitch
// together a path outside the shard tree.
let evil_id = "../etc/passwd_padded_to_xx_xxxxx".to_string();
assert_eq!(evil_id.len(), 32, "test fixture must be 32 chars to exercise length-only checks");
let mut asset = fixed_asset(b"x", 1, &b3_full_hex(b"x"));
asset.asset_id = AssetId(evil_id.clone());
let err = store
.put_asset_with_bytes(&asset, b"x")
.expect_err("must reject non-hex AssetId");
let msg = format!("{err:#}");
assert!(
msg.contains("invalid AssetId shape"),
"expected AssetId-shape rejection, got: {msg}"
);
// And the bytes must NOT have been staged anywhere under the assets
// tree (no I/O should have happened before validation).
let assets_dir = env.data_dir().join("assets");
if assets_dir.exists() {
for entry in std::fs::read_dir(&assets_dir).unwrap().flatten() {
// Recurse one level into shard dirs and assert empty.
if let Some(sub) = std::fs::read_dir(entry.path()).unwrap().flatten().next() {
panic!(
"invalid AssetId still produced filesystem artifact at {}",
sub.path().display()
);
}
}
}
}
#[test]
fn checksum_mismatch_returns_conflict() {
let env = common::TestEnv::new();