feat(telemetry): AiWorker emits ai_succeeded/ai_failed with reason (#7 v0.2.3)
This commit is contained in:
@@ -2,6 +2,7 @@ import type { NoteRepository } from '../repository/NoteRepository.js';
|
|||||||
import type { InferenceProvider } from './InferenceProvider.js';
|
import type { InferenceProvider } from './InferenceProvider.js';
|
||||||
import type { Note } from '@shared/types';
|
import type { Note } from '@shared/types';
|
||||||
import { parseAllCandidates } from '../services/dueDateParser.js';
|
import { parseAllCandidates } from '../services/dueDateParser.js';
|
||||||
|
import { ZodError } from 'zod';
|
||||||
|
|
||||||
const KST_OFFSET_MS = 9 * 60 * 60 * 1000;
|
const KST_OFFSET_MS = 9 * 60 * 60 * 1000;
|
||||||
|
|
||||||
@@ -15,6 +16,25 @@ function todayKstAsIso(now: Date): string {
|
|||||||
return todayKstAsDate(now).toISOString().slice(0, 10);
|
return todayKstAsDate(now).toISOString().slice(0, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function classifyReason(err: unknown): 'unreachable' | 'schema' | 'timeout' | 'other' {
|
||||||
|
if (err instanceof ZodError) return 'schema';
|
||||||
|
const msg = err instanceof Error ? err.message.toLowerCase() : String(err).toLowerCase();
|
||||||
|
if (msg.includes('econnrefused') || msg.includes('enotfound') || msg.includes('fetch failed') || msg.includes('econnreset') || msg.includes('unreachable')) {
|
||||||
|
return 'unreachable';
|
||||||
|
}
|
||||||
|
if (msg.includes('timeout') || msg.includes('timedout') || msg.includes('aborted')) {
|
||||||
|
return 'timeout';
|
||||||
|
}
|
||||||
|
return 'other';
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface AiTelemetryEmitter {
|
||||||
|
emit(input:
|
||||||
|
| { kind: 'ai_succeeded'; payload: { noteId: string; durationMs: number; attempts: number } }
|
||||||
|
| { kind: 'ai_failed'; payload: { noteId: string; reason: 'unreachable' | 'schema' | 'timeout' | 'other'; attempts: number } }
|
||||||
|
): Promise<void>;
|
||||||
|
}
|
||||||
|
|
||||||
export interface AiWorkerOptions {
|
export interface AiWorkerOptions {
|
||||||
backoffsMs?: number[];
|
backoffsMs?: number[];
|
||||||
onUpdate?: (note: Note) => void;
|
onUpdate?: (note: Note) => void;
|
||||||
@@ -24,6 +44,7 @@ export interface AiWorkerOptions {
|
|||||||
error: (msg: string, meta?: Record<string, unknown>) => void;
|
error: (msg: string, meta?: Record<string, unknown>) => void;
|
||||||
};
|
};
|
||||||
now?: () => Date;
|
now?: () => Date;
|
||||||
|
telemetry?: AiTelemetryEmitter;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface Job { noteId: string; attempts: number; }
|
interface Job { noteId: string; attempts: number; }
|
||||||
@@ -36,6 +57,7 @@ export class AiWorker {
|
|||||||
private onUpdate?: (note: Note) => void;
|
private onUpdate?: (note: Note) => void;
|
||||||
private logger: NonNullable<AiWorkerOptions['logger']>;
|
private logger: NonNullable<AiWorkerOptions['logger']>;
|
||||||
private now: () => Date;
|
private now: () => Date;
|
||||||
|
private telemetry?: AiTelemetryEmitter;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private repo: NoteRepository,
|
private repo: NoteRepository,
|
||||||
@@ -46,6 +68,7 @@ export class AiWorker {
|
|||||||
this.onUpdate = opts.onUpdate;
|
this.onUpdate = opts.onUpdate;
|
||||||
this.logger = opts.logger ?? { info: () => {}, warn: () => {}, error: () => {} };
|
this.logger = opts.logger ?? { info: () => {}, warn: () => {}, error: () => {} };
|
||||||
this.now = opts.now ?? (() => new Date());
|
this.now = opts.now ?? (() => new Date());
|
||||||
|
this.telemetry = opts.telemetry;
|
||||||
}
|
}
|
||||||
|
|
||||||
async enqueue(noteId: string): Promise<void> {
|
async enqueue(noteId: string): Promise<void> {
|
||||||
@@ -95,6 +118,7 @@ export class AiWorker {
|
|||||||
private async processJob(job: Job): Promise<void> {
|
private async processJob(job: Job): Promise<void> {
|
||||||
const max = this.backoffsMs.length;
|
const max = this.backoffsMs.length;
|
||||||
for (let attempt = job.attempts; attempt < max; attempt++) {
|
for (let attempt = job.attempts; attempt < max; attempt++) {
|
||||||
|
const startMs = Date.now();
|
||||||
try {
|
try {
|
||||||
const note = this.repo.findById(job.noteId);
|
const note = this.repo.findById(job.noteId);
|
||||||
if (!note || note.aiStatus !== 'pending') return;
|
if (!note || note.aiStatus !== 'pending') return;
|
||||||
@@ -121,6 +145,16 @@ export class AiWorker {
|
|||||||
dueDateSource: res.dueDate !== null ? 'ai' : 'none',
|
dueDateSource: res.dueDate !== null ? 'ai' : 'none',
|
||||||
candidatesCount: candidates.length
|
candidatesCount: candidates.length
|
||||||
});
|
});
|
||||||
|
if (this.telemetry) {
|
||||||
|
await this.telemetry.emit({
|
||||||
|
kind: 'ai_succeeded',
|
||||||
|
payload: {
|
||||||
|
noteId: job.noteId,
|
||||||
|
durationMs: Date.now() - startMs,
|
||||||
|
attempts: attempt
|
||||||
|
}
|
||||||
|
}).catch(() => {});
|
||||||
|
}
|
||||||
this.emit(job.noteId);
|
this.emit(job.noteId);
|
||||||
return;
|
return;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
@@ -132,6 +166,16 @@ export class AiWorker {
|
|||||||
if (isLast) {
|
if (isLast) {
|
||||||
this.repo.markAiFailed(job.noteId, msg);
|
this.repo.markAiFailed(job.noteId, msg);
|
||||||
this.logger.error('ai.failed', { noteId: job.noteId, err: msg });
|
this.logger.error('ai.failed', { noteId: job.noteId, err: msg });
|
||||||
|
if (this.telemetry) {
|
||||||
|
await this.telemetry.emit({
|
||||||
|
kind: 'ai_failed',
|
||||||
|
payload: {
|
||||||
|
noteId: job.noteId,
|
||||||
|
reason: classifyReason(err),
|
||||||
|
attempts: attempt + 1
|
||||||
|
}
|
||||||
|
}).catch(() => {});
|
||||||
|
}
|
||||||
this.emit(job.noteId);
|
this.emit(job.noteId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -193,3 +193,86 @@ describe('AiWorker', () => {
|
|||||||
expect(captured.dueDateCandidates.length).toBe(2); // 내일 + 모레
|
expect(captured.dueDateCandidates.length).toBe(2); // 내일 + 모레
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('AiWorker telemetry emit', () => {
|
||||||
|
let db: Database.Database;
|
||||||
|
let repo: NoteRepository;
|
||||||
|
let events: Array<{ kind: string; payload: { noteId: string; durationMs?: number; reason?: string; attempts: number } }>;
|
||||||
|
const collectingTelemetry = {
|
||||||
|
emit: async (ev: { kind: string; payload: { noteId: string; durationMs?: number; reason?: string; attempts: number } }) => {
|
||||||
|
events.push(ev);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
db = new Database(':memory:');
|
||||||
|
runMigrations(db);
|
||||||
|
repo = new NoteRepository(db);
|
||||||
|
events = [];
|
||||||
|
});
|
||||||
|
|
||||||
|
it('emits ai_succeeded with durationMs/attempts on success', async () => {
|
||||||
|
const { id } = repo.create({ rawText: '수요일 회의 메모' });
|
||||||
|
const w = new AiWorker(repo, makeProvider(), {
|
||||||
|
backoffsMs: [0, 0, 0],
|
||||||
|
telemetry: collectingTelemetry
|
||||||
|
});
|
||||||
|
await w.enqueue(id);
|
||||||
|
await w.drain();
|
||||||
|
const succeeded = events.find((e) => e.kind === 'ai_succeeded');
|
||||||
|
expect(succeeded).toBeDefined();
|
||||||
|
expect(succeeded!.payload.noteId).toBe(id);
|
||||||
|
expect(succeeded!.payload.attempts).toBe(0);
|
||||||
|
expect(succeeded!.payload.durationMs).toBeGreaterThanOrEqual(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('emits ai_failed with reason=unreachable on network error', async () => {
|
||||||
|
const { id } = repo.create({ rawText: '메모' });
|
||||||
|
const provider = makeProvider({
|
||||||
|
generate: vi.fn(async () => { throw new Error('fetch failed: ECONNREFUSED 11434'); })
|
||||||
|
});
|
||||||
|
const w = new AiWorker(repo, provider, {
|
||||||
|
backoffsMs: [0, 0, 0],
|
||||||
|
telemetry: collectingTelemetry
|
||||||
|
});
|
||||||
|
await w.enqueue(id);
|
||||||
|
await w.drain();
|
||||||
|
const failed = events.find((e) => e.kind === 'ai_failed');
|
||||||
|
expect(failed).toBeDefined();
|
||||||
|
expect(failed!.payload.reason).toBe('unreachable');
|
||||||
|
expect(failed!.payload.attempts).toBe(3);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('emits ai_failed with reason=schema on zod failure', async () => {
|
||||||
|
const { id } = repo.create({ rawText: '메모' });
|
||||||
|
const { ZodError } = await import('zod');
|
||||||
|
const provider = makeProvider({
|
||||||
|
generate: vi.fn(async () => { throw new ZodError([]); })
|
||||||
|
});
|
||||||
|
const w = new AiWorker(repo, provider, {
|
||||||
|
backoffsMs: [0, 0, 0],
|
||||||
|
telemetry: collectingTelemetry
|
||||||
|
});
|
||||||
|
await w.enqueue(id);
|
||||||
|
await w.drain();
|
||||||
|
const failed = events.find((e) => e.kind === 'ai_failed');
|
||||||
|
expect(failed).toBeDefined();
|
||||||
|
expect(failed!.payload.reason).toBe('schema');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('emits ai_failed with reason=other on unrecognized error', async () => {
|
||||||
|
const { id } = repo.create({ rawText: '메모' });
|
||||||
|
const provider = makeProvider({
|
||||||
|
generate: vi.fn(async () => { throw new Error('mystery'); })
|
||||||
|
});
|
||||||
|
const w = new AiWorker(repo, provider, {
|
||||||
|
backoffsMs: [0, 0, 0],
|
||||||
|
telemetry: collectingTelemetry
|
||||||
|
});
|
||||||
|
await w.enqueue(id);
|
||||||
|
await w.drain();
|
||||||
|
const failed = events.find((e) => e.kind === 'ai_failed');
|
||||||
|
expect(failed).toBeDefined();
|
||||||
|
expect(failed!.payload.reason).toBe('other');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user