feat(retry): AiWorker unreachable/timeout 무한 retry — 15분 cap (#2 v0.2.3)
This commit is contained in:
@@ -37,6 +37,7 @@ export interface AiTelemetryEmitter {
|
||||
|
||||
export interface AiWorkerOptions {
|
||||
backoffsMs?: number[];
|
||||
unreachableBackoffsMs?: number[];
|
||||
onUpdate?: (note: Note) => void;
|
||||
logger?: {
|
||||
info: (msg: string, meta?: Record<string, unknown>) => void;
|
||||
@@ -54,6 +55,8 @@ export class AiWorker {
|
||||
private running = false;
|
||||
private drainResolvers: Array<() => void> = [];
|
||||
private backoffsMs: number[];
|
||||
private unreachableBackoffsMs: number[];
|
||||
private unreachableBackoffStep = 0;
|
||||
private onUpdate?: (note: Note) => void;
|
||||
private logger: NonNullable<AiWorkerOptions['logger']>;
|
||||
private now: () => Date;
|
||||
@@ -65,6 +68,7 @@ export class AiWorker {
|
||||
opts: AiWorkerOptions = {}
|
||||
) {
|
||||
this.backoffsMs = opts.backoffsMs ?? [0, 30_000, 120_000];
|
||||
this.unreachableBackoffsMs = opts.unreachableBackoffsMs ?? [30_000, 60_000, 120_000, 240_000, 480_000, 900_000];
|
||||
this.onUpdate = opts.onUpdate;
|
||||
this.logger = opts.logger ?? { info: () => {}, warn: () => {}, error: () => {} };
|
||||
this.now = opts.now ?? (() => new Date());
|
||||
@@ -139,6 +143,7 @@ export class AiWorker {
|
||||
provider: this.provider.name,
|
||||
dueDate: res.dueDate ?? null
|
||||
});
|
||||
this.unreachableBackoffStep = 0; // 성공 시 step reset
|
||||
this.logger.info('ai.done', {
|
||||
noteId: job.noteId,
|
||||
attempt,
|
||||
@@ -158,9 +163,22 @@ export class AiWorker {
|
||||
this.emit(job.noteId);
|
||||
return;
|
||||
} catch (err) {
|
||||
const isLast = attempt === max - 1;
|
||||
const reason = classifyReason(err);
|
||||
const msg = (err as Error).message;
|
||||
this.logger.warn('ai.retry', { noteId: job.noteId, attempt, err: msg });
|
||||
this.logger.warn('ai.retry', { noteId: job.noteId, attempt, err: msg, reason });
|
||||
if (reason === 'unreachable' || reason === 'timeout') {
|
||||
// 무한 retry: attempts 증가 안 함, in-place loop + sleep.
|
||||
// markAiFailed / ai_failed emit 안 함 — ratio 통계는 schema/other 만 누적.
|
||||
const sleepMs = this.nextBackoffMs(this.unreachableBackoffStep);
|
||||
this.unreachableBackoffStep = Math.min(this.unreachableBackoffStep + 1, this.unreachableBackoffsMs.length - 1);
|
||||
const nextRunAt = new Date(Date.now() + sleepMs).toISOString();
|
||||
this.repo.setNextRunAt(job.noteId, nextRunAt, msg);
|
||||
await this.sleep(sleepMs);
|
||||
attempt -= 1; // for 루프 attempt++ 상쇄 — 같은 attempt 인덱스로 재시도
|
||||
continue;
|
||||
}
|
||||
// schema / other: 기존 max 3 retry 정책
|
||||
const isLast = attempt === max - 1;
|
||||
const nextRunAt = new Date(Date.now() + (this.backoffsMs[attempt + 1] ?? 0)).toISOString();
|
||||
this.repo.incrementJobAttempt(job.noteId, nextRunAt, msg);
|
||||
if (isLast) {
|
||||
@@ -171,7 +189,7 @@ export class AiWorker {
|
||||
kind: 'ai_failed',
|
||||
payload: {
|
||||
noteId: job.noteId,
|
||||
reason: classifyReason(err),
|
||||
reason,
|
||||
attempts: attempt + 1
|
||||
}
|
||||
}).catch(() => {});
|
||||
@@ -184,6 +202,11 @@ export class AiWorker {
|
||||
}
|
||||
}
|
||||
|
||||
private nextBackoffMs(step: number): number {
|
||||
const idx = Math.min(step, this.unreachableBackoffsMs.length - 1);
|
||||
return this.unreachableBackoffsMs[idx]!;
|
||||
}
|
||||
|
||||
private emit(noteId: string): void {
|
||||
if (!this.onUpdate) return;
|
||||
const note = this.repo.findById(noteId);
|
||||
|
||||
@@ -228,21 +228,21 @@ describe('AiWorker telemetry emit', () => {
|
||||
expect(succeeded!.payload.durationMs).toBeGreaterThanOrEqual(0);
|
||||
});
|
||||
|
||||
it('emits ai_failed with reason=unreachable on network error', async () => {
|
||||
it('unreachable error — ai_failed NOT emitted (infinite retry, no markAiFailed)', async () => {
|
||||
const { id } = repo.create({ rawText: '메모' });
|
||||
const provider = makeProvider({
|
||||
generate: vi.fn(async () => { throw new Error('fetch failed: ECONNREFUSED 11434'); })
|
||||
});
|
||||
const w = new AiWorker(repo, provider, {
|
||||
backoffsMs: [0, 0, 0],
|
||||
unreachableBackoffsMs: [10, 10, 10, 10, 10, 10],
|
||||
telemetry: collectingTelemetry
|
||||
});
|
||||
await w.enqueue(id);
|
||||
await w.drain();
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
const failed = events.find((e) => e.kind === 'ai_failed');
|
||||
expect(failed).toBeDefined();
|
||||
expect(failed!.payload.reason).toBe('unreachable');
|
||||
expect(failed!.payload.attempts).toBe(3);
|
||||
expect(failed).toBeUndefined();
|
||||
expect(repo.findById(id)!.aiStatus).toBe('pending');
|
||||
});
|
||||
|
||||
it('emits ai_failed with reason=schema on zod failure', async () => {
|
||||
@@ -304,3 +304,119 @@ describe('AiWorker — deletedAt guard (v0.2.3 #4)', () => {
|
||||
expect(repo.findById(id)!.aiStatus).toBe('pending');
|
||||
});
|
||||
});
|
||||
|
||||
describe('AiWorker — unreachable/timeout infinite retry (v0.2.3 #2)', () => {
|
||||
let db: Database.Database;
|
||||
let repo: NoteRepository;
|
||||
|
||||
beforeEach(() => {
|
||||
db = new Database(':memory:');
|
||||
runMigrations(db);
|
||||
repo = new NoteRepository(db);
|
||||
});
|
||||
|
||||
it('unreachable — markAiFailed 안 호출, attempts 증가 안 함', async () => {
|
||||
const provider = makeProvider({
|
||||
generate: vi.fn(async () => { throw new Error('ECONNREFUSED'); })
|
||||
});
|
||||
const w = new AiWorker(repo, provider, {
|
||||
backoffsMs: [0, 30_000, 120_000],
|
||||
unreachableBackoffsMs: [10, 10, 10, 10, 10, 10]
|
||||
});
|
||||
const { id } = repo.create({ rawText: 'x' });
|
||||
await w.enqueue(id);
|
||||
// 무한 retry — drain() 은 끝나지 않음. 짧게 대기 후 검증.
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
expect(repo.findById(id)!.aiStatus).toBe('pending');
|
||||
expect(provider.generate).toHaveBeenCalled();
|
||||
expect((provider.generate as any).mock.calls.length).toBeGreaterThanOrEqual(2);
|
||||
const job = repo.getAllPendingJobs().find((j) => j.noteId === id)!;
|
||||
expect(job.attempts).toBe(0);
|
||||
});
|
||||
|
||||
it('timeout — unreachable 동일 (Q2=A)', async () => {
|
||||
const provider = makeProvider({
|
||||
generate: vi.fn(async () => { throw new Error('Request timeout'); })
|
||||
});
|
||||
const w = new AiWorker(repo, provider, {
|
||||
backoffsMs: [0, 30_000, 120_000],
|
||||
unreachableBackoffsMs: [10, 10, 10, 10, 10, 10]
|
||||
});
|
||||
const { id } = repo.create({ rawText: 'x' });
|
||||
await w.enqueue(id);
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
expect(repo.findById(id)!.aiStatus).toBe('pending');
|
||||
expect((provider.generate as any).mock.calls.length).toBeGreaterThanOrEqual(2);
|
||||
});
|
||||
|
||||
it('schema fail max 3 — markAiFailed + ai_failed emit (reason=schema)', async () => {
|
||||
const { ZodError } = await import('zod');
|
||||
const provider = makeProvider({
|
||||
generate: vi.fn(async () => {
|
||||
throw new ZodError([{ code: 'custom', message: 'bad', path: [] } as any]);
|
||||
})
|
||||
});
|
||||
const events: any[] = [];
|
||||
const w = new AiWorker(repo, provider, {
|
||||
backoffsMs: [0, 0, 0],
|
||||
telemetry: { emit: async (e) => { events.push(e); } }
|
||||
});
|
||||
const { id } = repo.create({ rawText: 'x' });
|
||||
await w.enqueue(id);
|
||||
await w.drain();
|
||||
expect(repo.findById(id)!.aiStatus).toBe('failed');
|
||||
expect((provider.generate as any).mock.calls.length).toBe(3);
|
||||
const failed = events.find((e) => e.kind === 'ai_failed');
|
||||
expect(failed).toBeDefined();
|
||||
expect(failed.payload.reason).toBe('schema');
|
||||
});
|
||||
|
||||
it('other fail max 3 — markAiFailed + ai_failed emit (reason=other)', async () => {
|
||||
const provider = makeProvider({
|
||||
generate: vi.fn(async () => { throw new Error('something weird'); })
|
||||
});
|
||||
const events: any[] = [];
|
||||
const w = new AiWorker(repo, provider, {
|
||||
backoffsMs: [0, 0, 0],
|
||||
telemetry: { emit: async (e) => { events.push(e); } }
|
||||
});
|
||||
const { id } = repo.create({ rawText: 'x' });
|
||||
await w.enqueue(id);
|
||||
await w.drain();
|
||||
expect(repo.findById(id)!.aiStatus).toBe('failed');
|
||||
const failed = events.find((e) => e.kind === 'ai_failed');
|
||||
expect(failed.payload.reason).toBe('other');
|
||||
});
|
||||
|
||||
it('unreachable backoff schedule — nextBackoffMs(step) cap at index 5 (15분)', async () => {
|
||||
const w = new AiWorker(repo, makeProvider(), {
|
||||
backoffsMs: [0, 30_000, 120_000],
|
||||
unreachableBackoffsMs: [30_000, 60_000, 120_000, 240_000, 480_000, 900_000]
|
||||
});
|
||||
expect((w as any).nextBackoffMs(0)).toBe(30_000);
|
||||
expect((w as any).nextBackoffMs(2)).toBe(120_000);
|
||||
expect((w as any).nextBackoffMs(5)).toBe(900_000);
|
||||
expect((w as any).nextBackoffMs(10)).toBe(900_000); // cap
|
||||
});
|
||||
|
||||
it('success 후 unreachableBackoffStep reset', async () => {
|
||||
let callCount = 0;
|
||||
const provider = makeProvider({
|
||||
generate: vi.fn(async (): Promise<AiResponse> => {
|
||||
callCount += 1;
|
||||
if (callCount <= 2) throw new Error('ECONNREFUSED');
|
||||
return { title: 't', summary: 's', tags: [], dueDate: null };
|
||||
})
|
||||
});
|
||||
const w = new AiWorker(repo, provider, {
|
||||
backoffsMs: [0, 0, 0],
|
||||
unreachableBackoffsMs: [10, 10, 10, 10, 10, 10]
|
||||
});
|
||||
const { id } = repo.create({ rawText: 'x' });
|
||||
await w.enqueue(id);
|
||||
await w.drain();
|
||||
expect(repo.findById(id)!.aiStatus).toBe('done');
|
||||
expect(callCount).toBe(3);
|
||||
expect((w as any).unreachableBackoffStep).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user