Files
inkling/src/main/ai/AiWorker.ts
altair823 9fef2edb6e feat(ollama): ProviderHolder + AiWorker/HealthChecker refactor (v0.2.3.1)
- ProviderHolder: mutable holder + listeners, indirection layer
- AiWorker: constructor InferenceProvider → ProviderHolder
  this.provider.x → this.holder.get().x 전환
- HealthChecker: 동일 패턴
- src/main/index.ts: provider 를 ProviderHolder 로 감싸서 생성
- 기존 AiWorker / HealthChecker 테스트의 constructor 호출에 ProviderHolder wrap
- 단위 +2 cases (ProviderHolder)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 23:32:20 +09:00

250 lines
9.1 KiB
TypeScript

import type { NoteRepository } from '../repository/NoteRepository.js';
import type { Note } from '@shared/types';
import { ProviderHolder } from './ProviderHolder.js';
import { parseAllCandidates } from '../services/dueDateParser.js';
import { ZodError } from 'zod';
const KST_OFFSET_MS = 9 * 60 * 60 * 1000;
function todayKstAsDate(now: Date): Date {
// Returns a Date object whose UTC year/month/day match KST today
const k = new Date(now.getTime() + KST_OFFSET_MS);
return new Date(Date.UTC(k.getUTCFullYear(), k.getUTCMonth(), k.getUTCDate()));
}
function todayKstAsIso(now: Date): string {
return todayKstAsDate(now).toISOString().slice(0, 10);
}
function classifyReason(err: unknown): 'unreachable' | 'schema' | 'timeout' | 'other' {
if (err instanceof ZodError) return 'schema';
const msg = err instanceof Error ? err.message.toLowerCase() : String(err).toLowerCase();
if (msg.includes('econnrefused') || msg.includes('enotfound') || msg.includes('fetch failed') || msg.includes('econnreset') || msg.includes('unreachable')) {
return 'unreachable';
}
if (msg.includes('timeout') || msg.includes('timedout') || msg.includes('aborted')) {
return 'timeout';
}
return 'other';
}
export interface AiTelemetryEmitter {
emit(input:
| { kind: 'ai_succeeded'; payload: { noteId: string; durationMs: number; attempts: number } }
| { kind: 'ai_failed'; payload: { noteId: string; reason: 'unreachable' | 'schema' | 'timeout' | 'other'; attempts: number } }
| { kind: 'tag_vocab_hit'; payload: { tagId: number; vocabSize: number } }
| { kind: 'tag_vocab_miss'; payload: { vocabSize: number } }
): Promise<void>;
}
export interface AiWorkerOptions {
backoffsMs?: number[];
unreachableBackoffsMs?: number[];
onUpdate?: (note: Note) => void;
logger?: {
info: (msg: string, meta?: Record<string, unknown>) => void;
warn: (msg: string, meta?: Record<string, unknown>) => void;
error: (msg: string, meta?: Record<string, unknown>) => void;
};
now?: () => Date;
telemetry?: AiTelemetryEmitter;
}
interface Job { noteId: string; attempts: number; }
export class AiWorker {
private queue: Job[] = [];
private running = false;
private drainResolvers: Array<() => void> = [];
private backoffsMs: number[];
private unreachableBackoffsMs: number[];
private unreachableBackoffStep = 0;
private onUpdate?: (note: Note) => void;
private logger: NonNullable<AiWorkerOptions['logger']>;
private now: () => Date;
private telemetry?: AiTelemetryEmitter;
constructor(
private repo: NoteRepository,
private holder: ProviderHolder,
opts: AiWorkerOptions = {}
) {
this.backoffsMs = opts.backoffsMs ?? [0, 30_000, 120_000];
this.unreachableBackoffsMs = opts.unreachableBackoffsMs ?? [30_000, 60_000, 120_000, 240_000, 480_000, 900_000];
this.onUpdate = opts.onUpdate;
this.logger = opts.logger ?? { info: () => {}, warn: () => {}, error: () => {} };
this.now = opts.now ?? (() => new Date());
this.telemetry = opts.telemetry;
}
async enqueue(noteId: string): Promise<void> {
this.queue.push({ noteId, attempts: 0 });
this.kick();
}
async loadFromDb(): Promise<void> {
for (const j of this.repo.getAllPendingJobs()) {
this.queue.push({ noteId: j.noteId, attempts: j.attempts });
}
this.kick();
}
async drain(): Promise<void> {
if (!this.running && this.queue.length === 0) return;
await new Promise<void>((resolve) => {
this.drainResolvers.push(resolve);
this.kick();
});
}
private kick(): void {
if (this.running) return;
if (this.queue.length === 0) { this.resolveDrainers(); return; }
this.running = true;
void this.loop();
}
private async loop(): Promise<void> {
try {
while (this.queue.length > 0) {
const job = this.queue.shift()!;
await this.processJob(job);
}
} finally {
this.running = false;
this.resolveDrainers();
}
}
private resolveDrainers(): void {
const r = this.drainResolvers.splice(0);
for (const fn of r) fn();
}
private async processJob(job: Job): Promise<void> {
// `max` 는 schema/other 분기 (attempts 증가) 의 cap 이다.
// unreachable/timeout 분기는 `attempt -= 1; continue` 로 인덱스 stay — max 와 무관 무한 retry.
const max = this.backoffsMs.length;
for (let attempt = job.attempts; attempt < max; attempt++) {
const startMs = this.now().getTime();
try {
const note = this.repo.findById(job.noteId);
if (!note || note.deletedAt !== null || note.aiStatus !== 'pending') return;
const nowDate = this.now();
const todayDate = todayKstAsDate(nowDate);
const todayIso = todayKstAsIso(nowDate);
const candidates = parseAllCandidates(note.rawText, todayDate);
const vocab = this.repo.getTopUsedTags(20);
const res = await this.holder.get().generate({
text: note.rawText,
todayKst: todayIso,
dueDateCandidates: candidates,
vocab
});
// AI primary: AI's dueDate is final (no rule merge)
this.repo.updateAiResult(job.noteId, {
title: res.title,
summary: res.summary,
tags: res.tags,
provider: this.holder.get().name,
dueDate: res.dueDate ?? null
});
this.unreachableBackoffStep = 0; // 성공 시 step reset
this.logger.info('ai.done', {
noteId: job.noteId,
attempt,
dueDateSource: res.dueDate !== null ? 'ai' : 'none',
candidatesCount: candidates.length
});
if (this.telemetry) {
await this.telemetry.emit({
kind: 'ai_succeeded',
payload: {
noteId: job.noteId,
durationMs: this.now().getTime() - startMs,
attempts: attempt + 1
}
}).catch(() => {});
// v0.2.3 #3 — per-tag vocab hit/miss 분류 (updateAiResult 후 → tagId 보장)
// dedup: AI 응답에 같은 태그 중복 가능 — INSERT OR IGNORE 와 정합한 1-emit/태그 보장
const vocabSet = new Set(vocab);
for (const tagName of new Set(res.tags)) {
if (vocabSet.has(tagName)) {
const tagId = this.repo.getTagIdByName(tagName);
if (tagId !== null) {
await this.telemetry.emit({
kind: 'tag_vocab_hit',
payload: { tagId, vocabSize: vocab.length }
}).catch(() => {});
}
} else {
await this.telemetry.emit({
kind: 'tag_vocab_miss',
payload: { vocabSize: vocab.length }
}).catch(() => {});
}
}
}
this.emit(job.noteId);
return;
} catch (err) {
const reason = classifyReason(err);
const msg = (err as Error).message;
this.logger.warn('ai.retry', { noteId: job.noteId, attempt, err: msg, reason });
if (reason === 'unreachable' || reason === 'timeout') {
// 무한 retry: attempts 증가 안 함, in-place loop + sleep.
// markAiFailed / ai_failed emit 안 함 — ratio 통계는 schema/other 만 누적.
const sleepMs = this.nextBackoffMs(this.unreachableBackoffStep);
// step 이 cap 도달 후엔 인덱스 stay — increment 는 무의미하지만 안전한 no-op.
// (Math.min 가드: cap 넘어가도 length-1 로 묶임.)
if (this.unreachableBackoffStep < this.unreachableBackoffsMs.length - 1) {
this.unreachableBackoffStep += 1;
}
const nextRunAt = new Date(Date.now() + sleepMs).toISOString();
this.repo.setNextRunAt(job.noteId, nextRunAt, msg);
await this.sleep(sleepMs);
attempt -= 1; // for 루프 attempt++ 상쇄 — 같은 attempt 인덱스로 재시도
continue;
}
// schema / other: 기존 max 3 retry 정책
const isLast = attempt === max - 1;
const nextRunAt = new Date(Date.now() + (this.backoffsMs[attempt + 1] ?? 0)).toISOString();
this.repo.incrementJobAttempt(job.noteId, nextRunAt, msg);
if (isLast) {
this.repo.markAiFailed(job.noteId, msg);
this.logger.error('ai.failed', { noteId: job.noteId, err: msg });
if (this.telemetry) {
await this.telemetry.emit({
kind: 'ai_failed',
payload: {
noteId: job.noteId,
reason,
attempts: attempt + 1
}
}).catch(() => {});
}
this.emit(job.noteId);
return;
}
await this.sleep(this.backoffsMs[attempt + 1] ?? 0);
}
}
}
private nextBackoffMs(step: number): number {
const idx = Math.min(step, this.unreachableBackoffsMs.length - 1);
return this.unreachableBackoffsMs[idx]!;
}
private emit(noteId: string): void {
if (!this.onUpdate) return;
const note = this.repo.findById(noteId);
if (note) this.onUpdate(note);
}
private sleep(ms: number): Promise<void> {
if (ms <= 0) return Promise.resolve();
return new Promise((r) => setTimeout(r, ms));
}
}