- ProviderHolder: mutable holder + listeners, indirection layer - AiWorker: constructor InferenceProvider → ProviderHolder this.provider.x → this.holder.get().x 전환 - HealthChecker: 동일 패턴 - src/main/index.ts: provider 를 ProviderHolder 로 감싸서 생성 - 기존 AiWorker / HealthChecker 테스트의 constructor 호출에 ProviderHolder wrap - 단위 +2 cases (ProviderHolder) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
250 lines
9.1 KiB
TypeScript
250 lines
9.1 KiB
TypeScript
import type { NoteRepository } from '../repository/NoteRepository.js';
|
|
import type { Note } from '@shared/types';
|
|
import { ProviderHolder } from './ProviderHolder.js';
|
|
import { parseAllCandidates } from '../services/dueDateParser.js';
|
|
import { ZodError } from 'zod';
|
|
|
|
const KST_OFFSET_MS = 9 * 60 * 60 * 1000;
|
|
|
|
function todayKstAsDate(now: Date): Date {
|
|
// Returns a Date object whose UTC year/month/day match KST today
|
|
const k = new Date(now.getTime() + KST_OFFSET_MS);
|
|
return new Date(Date.UTC(k.getUTCFullYear(), k.getUTCMonth(), k.getUTCDate()));
|
|
}
|
|
|
|
function todayKstAsIso(now: Date): string {
|
|
return todayKstAsDate(now).toISOString().slice(0, 10);
|
|
}
|
|
|
|
function classifyReason(err: unknown): 'unreachable' | 'schema' | 'timeout' | 'other' {
|
|
if (err instanceof ZodError) return 'schema';
|
|
const msg = err instanceof Error ? err.message.toLowerCase() : String(err).toLowerCase();
|
|
if (msg.includes('econnrefused') || msg.includes('enotfound') || msg.includes('fetch failed') || msg.includes('econnreset') || msg.includes('unreachable')) {
|
|
return 'unreachable';
|
|
}
|
|
if (msg.includes('timeout') || msg.includes('timedout') || msg.includes('aborted')) {
|
|
return 'timeout';
|
|
}
|
|
return 'other';
|
|
}
|
|
|
|
export interface AiTelemetryEmitter {
|
|
emit(input:
|
|
| { kind: 'ai_succeeded'; payload: { noteId: string; durationMs: number; attempts: number } }
|
|
| { kind: 'ai_failed'; payload: { noteId: string; reason: 'unreachable' | 'schema' | 'timeout' | 'other'; attempts: number } }
|
|
| { kind: 'tag_vocab_hit'; payload: { tagId: number; vocabSize: number } }
|
|
| { kind: 'tag_vocab_miss'; payload: { vocabSize: number } }
|
|
): Promise<void>;
|
|
}
|
|
|
|
export interface AiWorkerOptions {
|
|
backoffsMs?: number[];
|
|
unreachableBackoffsMs?: number[];
|
|
onUpdate?: (note: Note) => void;
|
|
logger?: {
|
|
info: (msg: string, meta?: Record<string, unknown>) => void;
|
|
warn: (msg: string, meta?: Record<string, unknown>) => void;
|
|
error: (msg: string, meta?: Record<string, unknown>) => void;
|
|
};
|
|
now?: () => Date;
|
|
telemetry?: AiTelemetryEmitter;
|
|
}
|
|
|
|
interface Job { noteId: string; attempts: number; }
|
|
|
|
export class AiWorker {
|
|
private queue: Job[] = [];
|
|
private running = false;
|
|
private drainResolvers: Array<() => void> = [];
|
|
private backoffsMs: number[];
|
|
private unreachableBackoffsMs: number[];
|
|
private unreachableBackoffStep = 0;
|
|
private onUpdate?: (note: Note) => void;
|
|
private logger: NonNullable<AiWorkerOptions['logger']>;
|
|
private now: () => Date;
|
|
private telemetry?: AiTelemetryEmitter;
|
|
|
|
constructor(
|
|
private repo: NoteRepository,
|
|
private holder: ProviderHolder,
|
|
opts: AiWorkerOptions = {}
|
|
) {
|
|
this.backoffsMs = opts.backoffsMs ?? [0, 30_000, 120_000];
|
|
this.unreachableBackoffsMs = opts.unreachableBackoffsMs ?? [30_000, 60_000, 120_000, 240_000, 480_000, 900_000];
|
|
this.onUpdate = opts.onUpdate;
|
|
this.logger = opts.logger ?? { info: () => {}, warn: () => {}, error: () => {} };
|
|
this.now = opts.now ?? (() => new Date());
|
|
this.telemetry = opts.telemetry;
|
|
}
|
|
|
|
async enqueue(noteId: string): Promise<void> {
|
|
this.queue.push({ noteId, attempts: 0 });
|
|
this.kick();
|
|
}
|
|
|
|
async loadFromDb(): Promise<void> {
|
|
for (const j of this.repo.getAllPendingJobs()) {
|
|
this.queue.push({ noteId: j.noteId, attempts: j.attempts });
|
|
}
|
|
this.kick();
|
|
}
|
|
|
|
async drain(): Promise<void> {
|
|
if (!this.running && this.queue.length === 0) return;
|
|
await new Promise<void>((resolve) => {
|
|
this.drainResolvers.push(resolve);
|
|
this.kick();
|
|
});
|
|
}
|
|
|
|
private kick(): void {
|
|
if (this.running) return;
|
|
if (this.queue.length === 0) { this.resolveDrainers(); return; }
|
|
this.running = true;
|
|
void this.loop();
|
|
}
|
|
|
|
private async loop(): Promise<void> {
|
|
try {
|
|
while (this.queue.length > 0) {
|
|
const job = this.queue.shift()!;
|
|
await this.processJob(job);
|
|
}
|
|
} finally {
|
|
this.running = false;
|
|
this.resolveDrainers();
|
|
}
|
|
}
|
|
|
|
private resolveDrainers(): void {
|
|
const r = this.drainResolvers.splice(0);
|
|
for (const fn of r) fn();
|
|
}
|
|
|
|
private async processJob(job: Job): Promise<void> {
|
|
// `max` 는 schema/other 분기 (attempts 증가) 의 cap 이다.
|
|
// unreachable/timeout 분기는 `attempt -= 1; continue` 로 인덱스 stay — max 와 무관 무한 retry.
|
|
const max = this.backoffsMs.length;
|
|
for (let attempt = job.attempts; attempt < max; attempt++) {
|
|
const startMs = this.now().getTime();
|
|
try {
|
|
const note = this.repo.findById(job.noteId);
|
|
if (!note || note.deletedAt !== null || note.aiStatus !== 'pending') return;
|
|
const nowDate = this.now();
|
|
const todayDate = todayKstAsDate(nowDate);
|
|
const todayIso = todayKstAsIso(nowDate);
|
|
const candidates = parseAllCandidates(note.rawText, todayDate);
|
|
const vocab = this.repo.getTopUsedTags(20);
|
|
const res = await this.holder.get().generate({
|
|
text: note.rawText,
|
|
todayKst: todayIso,
|
|
dueDateCandidates: candidates,
|
|
vocab
|
|
});
|
|
// AI primary: AI's dueDate is final (no rule merge)
|
|
this.repo.updateAiResult(job.noteId, {
|
|
title: res.title,
|
|
summary: res.summary,
|
|
tags: res.tags,
|
|
provider: this.holder.get().name,
|
|
dueDate: res.dueDate ?? null
|
|
});
|
|
this.unreachableBackoffStep = 0; // 성공 시 step reset
|
|
this.logger.info('ai.done', {
|
|
noteId: job.noteId,
|
|
attempt,
|
|
dueDateSource: res.dueDate !== null ? 'ai' : 'none',
|
|
candidatesCount: candidates.length
|
|
});
|
|
if (this.telemetry) {
|
|
await this.telemetry.emit({
|
|
kind: 'ai_succeeded',
|
|
payload: {
|
|
noteId: job.noteId,
|
|
durationMs: this.now().getTime() - startMs,
|
|
attempts: attempt + 1
|
|
}
|
|
}).catch(() => {});
|
|
// v0.2.3 #3 — per-tag vocab hit/miss 분류 (updateAiResult 후 → tagId 보장)
|
|
// dedup: AI 응답에 같은 태그 중복 가능 — INSERT OR IGNORE 와 정합한 1-emit/태그 보장
|
|
const vocabSet = new Set(vocab);
|
|
for (const tagName of new Set(res.tags)) {
|
|
if (vocabSet.has(tagName)) {
|
|
const tagId = this.repo.getTagIdByName(tagName);
|
|
if (tagId !== null) {
|
|
await this.telemetry.emit({
|
|
kind: 'tag_vocab_hit',
|
|
payload: { tagId, vocabSize: vocab.length }
|
|
}).catch(() => {});
|
|
}
|
|
} else {
|
|
await this.telemetry.emit({
|
|
kind: 'tag_vocab_miss',
|
|
payload: { vocabSize: vocab.length }
|
|
}).catch(() => {});
|
|
}
|
|
}
|
|
}
|
|
this.emit(job.noteId);
|
|
return;
|
|
} catch (err) {
|
|
const reason = classifyReason(err);
|
|
const msg = (err as Error).message;
|
|
this.logger.warn('ai.retry', { noteId: job.noteId, attempt, err: msg, reason });
|
|
if (reason === 'unreachable' || reason === 'timeout') {
|
|
// 무한 retry: attempts 증가 안 함, in-place loop + sleep.
|
|
// markAiFailed / ai_failed emit 안 함 — ratio 통계는 schema/other 만 누적.
|
|
const sleepMs = this.nextBackoffMs(this.unreachableBackoffStep);
|
|
// step 이 cap 도달 후엔 인덱스 stay — increment 는 무의미하지만 안전한 no-op.
|
|
// (Math.min 가드: cap 넘어가도 length-1 로 묶임.)
|
|
if (this.unreachableBackoffStep < this.unreachableBackoffsMs.length - 1) {
|
|
this.unreachableBackoffStep += 1;
|
|
}
|
|
const nextRunAt = new Date(Date.now() + sleepMs).toISOString();
|
|
this.repo.setNextRunAt(job.noteId, nextRunAt, msg);
|
|
await this.sleep(sleepMs);
|
|
attempt -= 1; // for 루프 attempt++ 상쇄 — 같은 attempt 인덱스로 재시도
|
|
continue;
|
|
}
|
|
// schema / other: 기존 max 3 retry 정책
|
|
const isLast = attempt === max - 1;
|
|
const nextRunAt = new Date(Date.now() + (this.backoffsMs[attempt + 1] ?? 0)).toISOString();
|
|
this.repo.incrementJobAttempt(job.noteId, nextRunAt, msg);
|
|
if (isLast) {
|
|
this.repo.markAiFailed(job.noteId, msg);
|
|
this.logger.error('ai.failed', { noteId: job.noteId, err: msg });
|
|
if (this.telemetry) {
|
|
await this.telemetry.emit({
|
|
kind: 'ai_failed',
|
|
payload: {
|
|
noteId: job.noteId,
|
|
reason,
|
|
attempts: attempt + 1
|
|
}
|
|
}).catch(() => {});
|
|
}
|
|
this.emit(job.noteId);
|
|
return;
|
|
}
|
|
await this.sleep(this.backoffsMs[attempt + 1] ?? 0);
|
|
}
|
|
}
|
|
}
|
|
|
|
private nextBackoffMs(step: number): number {
|
|
const idx = Math.min(step, this.unreachableBackoffsMs.length - 1);
|
|
return this.unreachableBackoffsMs[idx]!;
|
|
}
|
|
|
|
private emit(noteId: string): void {
|
|
if (!this.onUpdate) return;
|
|
const note = this.repo.findById(noteId);
|
|
if (note) this.onUpdate(note);
|
|
}
|
|
|
|
private sleep(ms: number): Promise<void> {
|
|
if (ms <= 0) return Promise.resolve();
|
|
return new Promise((r) => setTimeout(r, ms));
|
|
}
|
|
}
|