feat(app): IngestEvent + ingest_with_config_progress (p9-fb-01) #52

Merged
altair823 merged 2 commits from feat/p9-fb-01-progress into main 2026-05-02 19:47:50 +00:00
Owner

Summary

Spec PR #51 (frozen design §2.4a + §10 + wire schema ingest_progress.v1) 의 첫 impl PR. kebab-app 가 progress event 를 mpsc::Sender<IngestEvent> 로 흘려보내는 surface 를 noop 까지 갖춤. 후속 PR 두 개 (p9-fb-02 CLI display, p9-fb-03 TUI background) 가 이 stream 을 소비.

변경

신규 모듈 crates/kebab-app/src/ingest_progress.rs

  • IngestEvent enum — #[serde(tag = "kind", rename_all = "snake_case")] 로 직렬화, wire schema ingest_progress.v1 와 1:1.
  • variants: ScanStarted / ScanCompleted / AssetStarted / AssetFinished / Completed / Aborted. Aborted 는 p9-fb-04 (cancel) 가 트리거 — 본 PR 에서는 미발신.
  • AggregateCounts struct: Completed / Aborted 의 terminal 집계.
  • media_label(MediaType) -> &'static str: §2.4a 의 short label (markdown / pdf / image / audio / other).
  • emit(progress, event): best-effort send. dropped receiver 는 silent absorb — ingest hot path 가 slow consumer 에 stall 안 함.

facade

#[doc(hidden)]
pub fn ingest_with_config_progress(
    config: kebab_config::Config,
    scope: SourceScope,
    summary_only: bool,
    progress: Option<std::sync::mpsc::Sender<IngestEvent>>,
) -> anyhow::Result<IngestReport>;

기존 ingest_with_configprogress = None forwarding wrapper. CLI / TUI / 외부 caller 가 stream 원하면 _progress 직접 호출.

발신 site

  • 함수 시작: ScanStarted { root }.
  • 워크스페이스 walk 직후: ScanCompleted { total }.
  • asset loop 매 iter 시작: AssetStarted { idx, total, path, media } (1-based idx).
  • asset loop 매 iter 끝: AssetFinished { idx, total, result, chunks }.
  • 함수 끝 (report 반환 직전): Completed { counts: AggregateCounts }.

embed_batch_* 는 §2.4a 의 "asset 이벤트 사이 임의 위치" — v1 단순화로 미발신. asset 단위 해상도면 CLI spinner / TUI status bar 충분. 후속 task 가 필요 판단 시 추가.

미적용 (별 task)

  • IngestEvent::Aborted 발신 (cancel token wiring) → p9-fb-04
  • embed_batch_started / embed_batch_finished → 후속 (수요 발생 시)

Test plan

  • cargo test -p kebab-app --lib ingest_progress — 6 PASS (media_label / serde discriminator / serde Completed counts / emit None / emit dropped / emit live).
  • cargo test -p kebab-app --test ingest_progress — 3 PASS (§2.4a invariant 의 sequence / forwarding wrapper / dropped receiver tolerance).
  • cargo clippy -p kebab-app --all-targets -- -D warnings clean.

Plan 갱신

tasks/p9/p9-fb-01-ingest-progress-callback.md frontmatter status: plannedstatus: in_progress. 머지 후 별도 한 줄 commit 으로 completed flip.

## Summary Spec PR #51 (frozen design §2.4a + §10 + wire schema `ingest_progress.v1`) 의 첫 impl PR. `kebab-app` 가 progress event 를 `mpsc::Sender<IngestEvent>` 로 흘려보내는 surface 를 noop 까지 갖춤. 후속 PR 두 개 (p9-fb-02 CLI display, p9-fb-03 TUI background) 가 이 stream 을 소비. ## 변경 ### 신규 모듈 `crates/kebab-app/src/ingest_progress.rs` - `IngestEvent` enum — `#[serde(tag = "kind", rename_all = "snake_case")]` 로 직렬화, wire schema `ingest_progress.v1` 와 1:1. - variants: `ScanStarted` / `ScanCompleted` / `AssetStarted` / `AssetFinished` / `Completed` / `Aborted`. `Aborted` 는 p9-fb-04 (cancel) 가 트리거 — 본 PR 에서는 미발신. - `AggregateCounts` struct: `Completed` / `Aborted` 의 terminal 집계. - `media_label(MediaType) -> &'static str`: §2.4a 의 short label (`markdown` / `pdf` / `image` / `audio` / `other`). - `emit(progress, event)`: best-effort send. dropped receiver 는 silent absorb — ingest hot path 가 slow consumer 에 stall 안 함. ### facade ```rust #[doc(hidden)] pub fn ingest_with_config_progress( config: kebab_config::Config, scope: SourceScope, summary_only: bool, progress: Option<std::sync::mpsc::Sender<IngestEvent>>, ) -> anyhow::Result<IngestReport>; ``` 기존 `ingest_with_config` 는 `progress = None` forwarding wrapper. CLI / TUI / 외부 caller 가 stream 원하면 `_progress` 직접 호출. ### 발신 site - 함수 시작: `ScanStarted { root }`. - 워크스페이스 walk 직후: `ScanCompleted { total }`. - asset loop 매 iter 시작: `AssetStarted { idx, total, path, media }` (1-based idx). - asset loop 매 iter 끝: `AssetFinished { idx, total, result, chunks }`. - 함수 끝 (report 반환 직전): `Completed { counts: AggregateCounts }`. embed_batch_* 는 §2.4a 의 \"asset 이벤트 사이 임의 위치\" — v1 단순화로 미발신. asset 단위 해상도면 CLI spinner / TUI status bar 충분. 후속 task 가 필요 판단 시 추가. ## 미적용 (별 task) - `IngestEvent::Aborted` 발신 (cancel token wiring) → p9-fb-04 - embed_batch_started / embed_batch_finished → 후속 (수요 발생 시) ## Test plan - [x] `cargo test -p kebab-app --lib ingest_progress` — 6 PASS (media_label / serde discriminator / serde Completed counts / emit None / emit dropped / emit live). - [x] `cargo test -p kebab-app --test ingest_progress` — 3 PASS (§2.4a invariant 의 sequence / forwarding wrapper / dropped receiver tolerance). - [x] `cargo clippy -p kebab-app --all-targets -- -D warnings` clean. ## Plan 갱신 `tasks/p9/p9-fb-01-ingest-progress-callback.md` frontmatter `status: planned` → `status: in_progress`. 머지 후 별도 한 줄 commit 으로 `completed` flip.
altair823 added 1 commit 2026-05-02 19:45:09 +00:00
Streaming progress channel for ingest. Facade emits one IngestEvent per
step boundary into an optional `mpsc::Sender<IngestEvent>` injected by
the caller. CLI (p9-fb-02), TUI (p9-fb-03), and future desktop UI all
consume the same stream.

신규:
- crates/kebab-app/src/ingest_progress.rs: `IngestEvent` enum (`#[serde(tag
  = "kind", rename_all = "snake_case")]` matching wire schema
  ingest_progress.v1) + `AggregateCounts` struct + `media_label` helper
  + best-effort `emit` helper.
- ingest_with_config_progress(cfg, scope, summary_only, progress) —
  존재 시 `mpsc::Sender<IngestEvent>` 로 ScanStarted → ScanCompleted →
  (AssetStarted < AssetFinished)* → Completed 발신. dropped receiver
  는 silent absorb (hot path stall 금지).
- 기존 ingest_with_config 가 `progress=None` forwarding wrapper.

미적용 (계약 상 향후 task 가 채움):
- IngestEvent::Aborted: cancel token wiring 은 p9-fb-04.
- embed_batch_started / embed_batch_finished: spec 의 \"asset 이벤트 사이
  임의 위치\" 에 해당. v1 단순화 — asset 단위 해상도면 CLI / TUI 충분.

Test:
- 6 lib unit (media_label / serde discriminator / emit corner cases).
- 3 integration (이벤트 sequence 가 §2.4a invariant 준수 / forwarding
  wrapper / dropped receiver tolerance).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
claude-reviewer-01 requested changes 2026-05-02 19:46:53 +00:00
Dismissed
claude-reviewer-01 left a comment
Member

회차 1 — nit 1건 + 칭찬 4건. 첫 impl PR 이라 표면 균질.

핵심 actionable:

  1. emittracing::trace! 메시지 두 번째 절 ("suppressing further sends would require caller cooperation") 모호 — 단순 "receiver dropped; event discarded" 정도로 단축 권장.

총평: spec PR #51 의 §2.4a 계약 (ordering invariant, best-effort send, embed_batch optional, Aborted 가 별 task 의 책임) 이 코드 + test + doc 세 곳에 정확히 매핑. forwarding wrapper 패턴이 backward-compat shim 이 아닌 entry-point 조합 — facade rule 준수. integration test 가 §2.4a invariant 자체를 코드화 — 미래 emit 위치 변경 시 즉시 잡힘. 위 nit 1건만 정리하면 머지.

회차 1 — nit 1건 + 칭찬 4건. 첫 impl PR 이라 표면 균질. 핵심 actionable: 1. **`emit` 의 `tracing::trace!` 메시지 두 번째 절 (\"suppressing further sends would require caller cooperation\") 모호** — 단순 \"receiver dropped; event discarded\" 정도로 단축 권장. 총평: spec PR #51 의 §2.4a 계약 (ordering invariant, best-effort send, embed_batch optional, Aborted 가 별 task 의 책임) 이 코드 + test + doc 세 곳에 정확히 매핑. forwarding wrapper 패턴이 backward-compat shim 이 아닌 entry-point 조합 — facade rule 준수. integration test 가 §2.4a invariant 자체를 코드화 — 미래 emit 위치 변경 시 즉시 잡힘. 위 nit 1건만 정리하면 머지.
@@ -0,0 +67,4 @@
/// Finished processing the `idx`-th asset. `result` mirrors the
/// asset's `IngestItemKind`; `chunks` is the number of chunks
/// produced (0 for `Skipped` / `Error`).
AssetFinished {

(칭찬) #[serde(tag = "kind", rename_all = "snake_case")] 가 wire schema ingest_progress.v1 의 discriminator + 필드명과 1:1 일치 — kind = "asset_started" / idx / total / path / media 자동. CLI 의 wire layer 가 schema_version 만 한 줄 더 붙이면 spec §2.4a 와 동일 JSON 산출. 미래 변경 시 (variants 추가 / 이름 변경) serde tag 만 보면 wire 영향 즉시 추적.

(칭찬) `#[serde(tag = "kind", rename_all = "snake_case")]` 가 wire schema `ingest_progress.v1` 의 discriminator + 필드명과 1:1 일치 — `kind = "asset_started"` / `idx` / `total` / `path` / `media` 자동. CLI 의 wire layer 가 `schema_version` 만 한 줄 더 붙이면 spec §2.4a 와 동일 JSON 산출. 미래 변경 시 (variants 추가 / 이름 변경) serde tag 만 보면 wire 영향 즉시 추적.
@@ -0,0 +116,4 @@
use super::*;
use kebab_core::MediaType;
#[test]

(칭찬) emit helper 가 Option<&Sender> 받아 None 단축 + dropped receiver silent absorb 두 corner case 를 한 함수에 묶음 + lib unit test 3 개 (None / dropped / live) 가 각 분기 cover. ingest 의 hot path 에서 emit 호출 자체가 if-let 한 줄이라 noop overhead 0. integration test 의 "dropped receiver tolerance" 와 짝.

(칭찬) `emit` helper 가 `Option<&Sender>` 받아 None 단축 + dropped receiver silent absorb 두 corner case 를 한 함수에 묶음 + lib unit test 3 개 (None / dropped / live) 가 각 분기 cover. ingest 의 hot path 에서 emit 호출 자체가 if-let 한 줄이라 noop overhead 0. integration test 의 "dropped receiver tolerance" 와 짝.
@@ -0,0 +124,4 @@
media_label(&MediaType::Image(kebab_core::ImageType::Png)),
"image"
);
assert_eq!(

(nit / 메시지 모호) trace! 메시지 progress receiver dropped; suppressing further sends would require caller cooperation — 두 번째 절이 의미 불명확 ("caller cooperation" 이 어떻게 suppress 를 가능하게 하는가?). 단순 receiver drop 사실만 기록하면 충분.

Why: 이 trace! 는 producer 측 best-effort fail 의 원인을 추적하는 용도. "caller cooperation" 추측은 미래 reader 에게 노이즈 — 사실 ("receiver dropped") + 영향 ("event discarded") 만 기록.

How to apply: 메시지를 progress receiver dropped; event discarded (best-effort send per ingest_progress contract) 정도로 단순화. 또는 더 짧게 progress receiver dropped.

(nit / 메시지 모호) trace! 메시지 `progress receiver dropped; suppressing further sends would require caller cooperation` — 두 번째 절이 의미 불명확 ("caller cooperation" 이 어떻게 suppress 를 가능하게 하는가?). 단순 receiver drop 사실만 기록하면 충분. Why: 이 trace! 는 producer 측 best-effort fail 의 원인을 추적하는 용도. "caller cooperation" 추측은 미래 reader 에게 노이즈 — 사실 ("receiver dropped") + 영향 ("event discarded") 만 기록. How to apply: 메시지를 `progress receiver dropped; event discarded (best-effort send per ingest_progress contract)` 정도로 단순화. 또는 더 짧게 `progress receiver dropped`.

(칭찬) 기존 ingest_with_config(...)_progress(.., None) forwarding wrapper 로 단순화 — 본 PR scope 안에서 호출자 (CLI / TUI / integration test) 변경 0. forwarding 패턴이 backward-compat shim 이 아니라 "progress 의식 없는 caller 의 자연 entry point" 로 doc comment 가 의도 명시. CLAUDE.md 의 "backward-compat shim 금지" 룰 정합 — facade 의 의도된 surface 조합.

(칭찬) 기존 `ingest_with_config(...)` 가 `_progress(.., None)` forwarding wrapper 로 단순화 — 본 PR scope 안에서 호출자 (CLI / TUI / integration test) 변경 0. forwarding 패턴이 backward-compat shim 이 아니라 "progress 의식 없는 caller 의 자연 entry point" 로 doc comment 가 의도 명시. CLAUDE.md 의 "backward-compat shim 금지" 룰 정합 — facade 의 의도된 surface 조합.

(칭찬) integration test progress_event_sequence_matches_design_section_2_4a 가 §2.4a 의 ordering invariant (ScanStarted < ScanCompleted < (AssetStarted < AssetFinished)* < Completed) 를 코드로 코드화 + monotonic idx + Started/Finished 1:1 pairing 까지 assert. 미래 누군가 새 step 추가 또는 emit 위치 옮기면 이 테스트가 즉시 실패 — spec 변경 없이 invariant drift 막음. 회귀 안전망 정확히 spec 옆에서.

(칭찬) integration test `progress_event_sequence_matches_design_section_2_4a` 가 §2.4a 의 ordering invariant (`ScanStarted < ScanCompleted < (AssetStarted < AssetFinished)* < Completed`) 를 코드로 코드화 + monotonic idx + Started/Finished 1:1 pairing 까지 assert. 미래 누군가 새 step 추가 또는 emit 위치 옮기면 이 테스트가 즉시 실패 — spec 변경 없이 invariant drift 막음. 회귀 안전망 정확히 spec 옆에서.
altair823 added 1 commit 2026-05-02 19:47:26 +00:00
회차 1 nit 반영. 'suppressing further sends would require caller
cooperation' 의 두 번째 절이 의미 불명확 — 단순 'event discarded
(best-effort send per ingest_progress contract)' 로 단축. 사실 +
영향만 기록, caller 가 어떻게 cooperate 하는지 추측은 제거.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
claude-reviewer-01 approved these changes 2026-05-02 19:47:33 +00:00
claude-reviewer-01 left a comment
Member

회차 2 — 회차 1 nit 1건 (emit trace 메시지 단순화) 정확히 반영. 추가 actionable 0. APPROVE.

회차 2 — 회차 1 nit 1건 (emit trace 메시지 단순화) 정확히 반영. 추가 actionable 0. APPROVE.
altair823 merged commit 7c6009f7e7 into main 2026-05-02 19:47:50 +00:00
altair823 deleted branch feat/p9-fb-01-progress 2026-05-02 19:47:51 +00:00
Sign in to join this conversation.
No Reviewers
No Label
2 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: altair823-org/kebab#52