84 lines
2.7 KiB
TypeScript
84 lines
2.7 KiB
TypeScript
import type { EditJobPayload } from '@dynamic-sites/shared';
|
|
import { logger } from '../logger.js';
|
|
|
|
export interface EditQueue {
|
|
enqueue(payload: EditJobPayload): void;
|
|
startConsumer(processor: (job: EditJobPayload) => Promise<void>): void;
|
|
getQueueDepth(): number;
|
|
shutdown(): Promise<void>;
|
|
}
|
|
|
|
const MAX_QUEUE_DEPTH = parseInt(process.env.MAX_QUEUE_DEPTH || '20', 10);
|
|
|
|
export function createEditQueue(): EditQueue {
|
|
const jobs: EditJobPayload[] = [];
|
|
let processing = false;
|
|
let shuttingDown = false;
|
|
let processor: ((job: EditJobPayload) => Promise<void>) | null = null;
|
|
let resolveShutdown: (() => void) | null = null;
|
|
|
|
async function drain() {
|
|
if (processing) return;
|
|
processing = true;
|
|
|
|
while (jobs.length > 0 && !shuttingDown) {
|
|
const job = jobs.shift()!;
|
|
logger.info({ event: 'job.started', kind: job.kind, id: job.id }, 'Processing job');
|
|
try {
|
|
await processor!(job);
|
|
logger.info({ event: 'job.completed', kind: job.kind, id: job.id }, 'Job completed');
|
|
} catch (err) {
|
|
logger.error({ event: 'job.failed', kind: job.kind, id: job.id, error: (err as Error).message }, 'Job failed');
|
|
}
|
|
}
|
|
|
|
processing = false;
|
|
|
|
if (shuttingDown && jobs.length === 0 && resolveShutdown) {
|
|
resolveShutdown();
|
|
}
|
|
}
|
|
|
|
return {
|
|
enqueue(payload: EditJobPayload) {
|
|
if (shuttingDown) {
|
|
logger.warn({ event: 'job.rejected', reason: 'shutting_down' }, 'Rejecting job — shutting down');
|
|
return;
|
|
}
|
|
if (jobs.length >= MAX_QUEUE_DEPTH) {
|
|
logger.warn({ event: 'job.rejected', reason: 'queue_full', depth: jobs.length }, 'Queue depth exceeded');
|
|
throw new Error('QUEUE_FULL');
|
|
}
|
|
jobs.push(payload);
|
|
logger.info({ event: 'job.enqueued', kind: payload.kind, id: payload.id, depth: jobs.length }, 'Job enqueued');
|
|
// Start draining on next tick
|
|
if (processor) setImmediate(drain);
|
|
},
|
|
|
|
startConsumer(proc) {
|
|
processor = proc;
|
|
logger.info({ event: 'consumer.started' }, 'Edit queue consumer started');
|
|
// Start draining in case jobs were enqueued before consumer started
|
|
if (jobs.length > 0) setImmediate(drain);
|
|
},
|
|
|
|
getQueueDepth() {
|
|
return jobs.length;
|
|
},
|
|
|
|
async shutdown() {
|
|
shuttingDown = true;
|
|
const remaining = jobs.length;
|
|
if (remaining > 0) {
|
|
logger.warn({ event: 'consumer.shutdown', dropped: remaining }, `Shutting down with ${remaining} queued jobs`);
|
|
}
|
|
if (processing) {
|
|
// Wait for current job to finish
|
|
await new Promise<void>(resolve => { resolveShutdown = resolve; });
|
|
}
|
|
// Clear remaining jobs
|
|
jobs.length = 0;
|
|
},
|
|
};
|
|
}
|