Files
inkling/src/main/ai/AiWorker.ts
altair823 83cefccbdd fix(v032): AiWorker Promise.all closure type narrowing 회복
Task 6 의 Promise.all 도입 시 async callback closure 가 this.telemetry?
narrowing 잃어 TS2532 발생. const telemetry = this.telemetry 로 narrowed
reference capture 후 closure 안에서 사용.
2026-05-10 14:25:24 +09:00

272 lines
11 KiB
TypeScript

import { readFile } from 'node:fs/promises';
import type { NoteRepository } from '../repository/NoteRepository.js';
import type { Note } from '@shared/types';
import type { AiFailedReason } from '../services/telemetryEvents.js';
import type { SettingsService } from '../services/SettingsService.js';
import type { MediaStore } from '../services/MediaStore.js';
import { ProviderHolder } from './ProviderHolder.js';
import { parseAllCandidates } from '../services/dueDateParser.js';
import { ZodError } from 'zod';
import { kstTodayAsDate, kstTodayIso } from '../../shared/util/kstDate.js';
// v0.2.6 #29 — backlog 의 top-N 튜닝은 dogfood telemetry 후 (현재 magic 만 추출).
const VOCAB_TOP_N = 20;
function classifyReason(err: unknown): AiFailedReason {
if (err instanceof ZodError) return 'schema';
const msg = err instanceof Error ? err.message.toLowerCase() : String(err).toLowerCase();
if (msg.includes('econnrefused') || msg.includes('enotfound') || msg.includes('fetch failed') || msg.includes('econnreset') || msg.includes('unreachable')) {
return 'unreachable';
}
if (msg.includes('timeout') || msg.includes('timedout') || msg.includes('aborted')) {
return 'timeout';
}
return 'other';
}
export interface AiTelemetryEmitter {
emit(input:
| { kind: 'ai_succeeded'; payload: { noteId: string; durationMs: number; attempts: number } }
| { kind: 'ai_failed'; payload: { noteId: string; reason: AiFailedReason; attempts: number } }
| { kind: 'tag_vocab_hit'; payload: { tagId: number; vocabSize: number } }
| { kind: 'tag_vocab_miss'; payload: { vocabSize: number } }
): Promise<void>;
}
export interface AiWorkerOptions {
backoffsMs?: number[];
unreachableBackoffsMs?: number[];
onUpdate?: (note: Note) => void;
logger?: {
info: (msg: string, meta?: Record<string, unknown>) => void;
warn: (msg: string, meta?: Record<string, unknown>) => void;
error: (msg: string, meta?: Record<string, unknown>) => void;
};
now?: () => Date;
telemetry?: AiTelemetryEmitter;
/** v0.3.1 Cut F — vision 지원. 미전달 시 vision 비활성. */
settings?: Pick<SettingsService, 'getVisionModel'>;
/** v0.3.1 Cut F — 첨부 이미지 절대경로 변환. settings 와 함께 전달 시 vision 활성. */
mediaStore?: Pick<MediaStore, 'absolutePath'>;
}
interface Job { noteId: string; attempts: number; }
export class AiWorker {
private queue: Job[] = [];
private running = false;
private drainResolvers: Array<() => void> = [];
private backoffsMs: number[];
private unreachableBackoffsMs: number[];
private unreachableBackoffStep = 0;
private onUpdate?: (note: Note) => void;
private logger: NonNullable<AiWorkerOptions['logger']>;
private now: () => Date;
private telemetry?: AiTelemetryEmitter;
private settings?: Pick<SettingsService, 'getVisionModel'>;
private mediaStore?: Pick<MediaStore, 'absolutePath'>;
constructor(
private repo: NoteRepository,
private holder: ProviderHolder,
opts: AiWorkerOptions = {}
) {
this.backoffsMs = opts.backoffsMs ?? [0, 30_000, 120_000];
this.unreachableBackoffsMs = opts.unreachableBackoffsMs ?? [30_000, 60_000, 120_000, 240_000, 480_000, 900_000];
this.onUpdate = opts.onUpdate;
this.logger = opts.logger ?? { info: () => {}, warn: () => {}, error: () => {} };
this.now = opts.now ?? (() => new Date());
this.telemetry = opts.telemetry;
this.settings = opts.settings;
this.mediaStore = opts.mediaStore;
}
async enqueue(noteId: string): Promise<void> {
this.queue.push({ noteId, attempts: 0 });
this.kick();
}
async loadFromDb(): Promise<void> {
for (const j of this.repo.getAllPendingJobs()) {
this.queue.push({ noteId: j.noteId, attempts: j.attempts });
}
this.kick();
}
async drain(): Promise<void> {
if (!this.running && this.queue.length === 0) return;
await new Promise<void>((resolve) => {
this.drainResolvers.push(resolve);
this.kick();
});
}
private kick(): void {
if (this.running) return;
if (this.queue.length === 0) { this.resolveDrainers(); return; }
this.running = true;
void this.loop();
}
private async loop(): Promise<void> {
try {
while (this.queue.length > 0) {
const job = this.queue.shift()!;
await this.processJob(job);
}
} finally {
this.running = false;
this.resolveDrainers();
}
}
private resolveDrainers(): void {
const r = this.drainResolvers.splice(0);
for (const fn of r) fn();
}
private async processJob(job: Job): Promise<void> {
// `max` 는 schema/other 분기 (attempts 증가) 의 cap 이다.
// unreachable/timeout 분기는 `attempt -= 1; continue` 로 인덱스 stay — max 와 무관 무한 retry.
const max = this.backoffsMs.length;
for (let attempt = job.attempts; attempt < max; attempt++) {
const startMs = this.now().getTime();
try {
const note = this.repo.findById(job.noteId);
if (!note || note.deletedAt !== null || note.aiStatus !== 'pending') return;
const nowDate = this.now();
const todayDate = kstTodayAsDate(nowDate);
const todayIso = kstTodayIso(nowDate);
const candidates = parseAllCandidates(note.rawText, todayDate);
const vocab = this.repo.getTopUsedTags(VOCAB_TOP_N);
// v0.3.1 Cut F — vision path: visionModel + note.media → base64 images
// final review fix: note.media[].bytes 로 fast-fail (readFile/base64 비용 회피).
// 5MB cap 초과 시 throw → AiWorker 의 'other' 분기 → markAiFailed 도달.
const visionModel = this.settings ? await this.settings.getVisionModel() : null;
let images: Array<{ base64: string; mime: string }> | undefined;
if (visionModel && note.media.length > 0 && this.mediaStore) {
const oversize = note.media.find((m) => m.bytes > 5 * 1024 * 1024);
if (oversize) {
throw new Error(`image ${oversize.relPath} exceeds 5MB cap (${oversize.bytes} bytes)`);
}
images = await Promise.all(
note.media.map(async (m) => {
const buf = await readFile(this.mediaStore!.absolutePath(m.relPath));
return { base64: buf.toString('base64'), mime: m.mime };
})
);
}
const res = await this.holder.get().generate(
{ text: note.rawText, images, todayKst: todayIso, dueDateCandidates: candidates, vocab },
{ visionModel: visionModel ?? undefined }
);
// AI primary: AI's dueDate is final (no rule merge)
this.repo.updateAiResult(job.noteId, {
title: res.title,
summary: res.summary,
tags: res.tags,
provider: this.holder.get().name,
dueDate: res.dueDate ?? null
});
this.unreachableBackoffStep = 0; // 성공 시 step reset
this.logger.info('ai.done', {
noteId: job.noteId,
attempt,
dueDateSource: res.dueDate !== null ? 'ai' : 'none',
candidatesCount: candidates.length
});
if (this.telemetry) {
const telemetry = this.telemetry;
await telemetry.emit({
kind: 'ai_succeeded',
payload: {
noteId: job.noteId,
durationMs: this.now().getTime() - startMs,
attempts: attempt + 1
}
}).catch(() => {});
// v0.2.3 #3 — per-tag vocab hit/miss 분류 (updateAiResult 후 → tagId 보장)
// dedup: AI 응답에 같은 태그 중복 가능 — INSERT OR IGNORE 와 정합한 1-emit/태그 보장
const vocabSet = new Set(vocab.map((v) => v.toLowerCase()));
await Promise.all(
Array.from(new Set(res.tags)).map(async (tagName) => {
if (vocabSet.has(tagName.toLowerCase())) {
const tagId = this.repo.getTagIdByName(tagName);
if (tagId !== null) {
await telemetry.emit({
kind: 'tag_vocab_hit',
payload: { tagId, vocabSize: vocab.length }
}).catch(() => {});
}
} else {
await telemetry.emit({
kind: 'tag_vocab_miss',
payload: { vocabSize: vocab.length }
}).catch(() => {});
}
})
);
}
this.emit(job.noteId);
return;
} catch (err) {
const reason = classifyReason(err);
const msg = (err as Error).message;
this.logger.warn('ai.retry', { noteId: job.noteId, attempt, err: msg, reason });
if (reason === 'unreachable' || reason === 'timeout') {
// 무한 retry: attempts 증가 안 함, in-place loop + sleep.
// markAiFailed / ai_failed emit 안 함 — ratio 통계는 schema/other 만 누적.
const sleepMs = this.nextBackoffMs(this.unreachableBackoffStep);
// step 이 cap 도달 후엔 인덱스 stay — increment 는 무의미하지만 안전한 no-op.
// (Math.min 가드: cap 넘어가도 length-1 로 묶임.)
if (this.unreachableBackoffStep < this.unreachableBackoffsMs.length - 1) {
this.unreachableBackoffStep += 1;
}
const nextRunAt = new Date(Date.now() + sleepMs).toISOString();
this.repo.setNextRunAt(job.noteId, nextRunAt, msg);
await this.sleep(sleepMs);
attempt -= 1; // for 루프 attempt++ 상쇄 — 같은 attempt 인덱스로 재시도
continue;
}
// schema / other: 기존 max 3 retry 정책
const isLast = attempt === max - 1;
const nextRunAt = new Date(Date.now() + (this.backoffsMs[attempt + 1] ?? 0)).toISOString();
this.repo.incrementJobAttempt(job.noteId, nextRunAt, msg);
if (isLast) {
this.repo.markAiFailed(job.noteId, msg);
this.logger.error('ai.failed', { noteId: job.noteId, err: msg });
if (this.telemetry) {
await this.telemetry.emit({
kind: 'ai_failed',
payload: {
noteId: job.noteId,
reason,
attempts: attempt + 1
}
}).catch(() => {});
}
this.emit(job.noteId);
return;
}
await this.sleep(this.backoffsMs[attempt + 1] ?? 0);
}
}
}
private nextBackoffMs(step: number): number {
const idx = Math.min(step, this.unreachableBackoffsMs.length - 1);
return this.unreachableBackoffsMs[idx]!;
}
private emit(noteId: string): void {
if (!this.onUpdate) return;
const note = this.repo.findById(noteId);
if (note) this.onUpdate(note);
}
private sleep(ms: number): Promise<void> {
if (ms <= 0) return Promise.resolve();
return new Promise((r) => setTimeout(r, ms));
}
}