First cut
This commit is contained in:
83
server/src/queue/edit-queue.ts
Normal file
83
server/src/queue/edit-queue.ts
Normal file
@@ -0,0 +1,83 @@
|
||||
import type { EditJobPayload } from '@dynamic-sites/shared';
|
||||
import { logger } from '../logger.js';
|
||||
|
||||
export interface EditQueue {
|
||||
enqueue(payload: EditJobPayload): void;
|
||||
startConsumer(processor: (job: EditJobPayload) => Promise<void>): void;
|
||||
getQueueDepth(): number;
|
||||
shutdown(): Promise<void>;
|
||||
}
|
||||
|
||||
const MAX_QUEUE_DEPTH = parseInt(process.env.MAX_QUEUE_DEPTH || '20', 10);
|
||||
|
||||
export function createEditQueue(): EditQueue {
|
||||
const jobs: EditJobPayload[] = [];
|
||||
let processing = false;
|
||||
let shuttingDown = false;
|
||||
let processor: ((job: EditJobPayload) => Promise<void>) | null = null;
|
||||
let resolveShutdown: (() => void) | null = null;
|
||||
|
||||
async function drain() {
|
||||
if (processing) return;
|
||||
processing = true;
|
||||
|
||||
while (jobs.length > 0 && !shuttingDown) {
|
||||
const job = jobs.shift()!;
|
||||
logger.info({ event: 'job.started', kind: job.kind, id: job.id }, 'Processing job');
|
||||
try {
|
||||
await processor!(job);
|
||||
logger.info({ event: 'job.completed', kind: job.kind, id: job.id }, 'Job completed');
|
||||
} catch (err) {
|
||||
logger.error({ event: 'job.failed', kind: job.kind, id: job.id, error: (err as Error).message }, 'Job failed');
|
||||
}
|
||||
}
|
||||
|
||||
processing = false;
|
||||
|
||||
if (shuttingDown && jobs.length === 0 && resolveShutdown) {
|
||||
resolveShutdown();
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
enqueue(payload: EditJobPayload) {
|
||||
if (shuttingDown) {
|
||||
logger.warn({ event: 'job.rejected', reason: 'shutting_down' }, 'Rejecting job — shutting down');
|
||||
return;
|
||||
}
|
||||
if (jobs.length >= MAX_QUEUE_DEPTH) {
|
||||
logger.warn({ event: 'job.rejected', reason: 'queue_full', depth: jobs.length }, 'Queue depth exceeded');
|
||||
throw new Error('QUEUE_FULL');
|
||||
}
|
||||
jobs.push(payload);
|
||||
logger.info({ event: 'job.enqueued', kind: payload.kind, id: payload.id, depth: jobs.length }, 'Job enqueued');
|
||||
// Start draining on next tick
|
||||
if (processor) setImmediate(drain);
|
||||
},
|
||||
|
||||
startConsumer(proc) {
|
||||
processor = proc;
|
||||
logger.info({ event: 'consumer.started' }, 'Edit queue consumer started');
|
||||
// Start draining in case jobs were enqueued before consumer started
|
||||
if (jobs.length > 0) setImmediate(drain);
|
||||
},
|
||||
|
||||
getQueueDepth() {
|
||||
return jobs.length;
|
||||
},
|
||||
|
||||
async shutdown() {
|
||||
shuttingDown = true;
|
||||
const remaining = jobs.length;
|
||||
if (remaining > 0) {
|
||||
logger.warn({ event: 'consumer.shutdown', dropped: remaining }, `Shutting down with ${remaining} queued jobs`);
|
||||
}
|
||||
if (processing) {
|
||||
// Wait for current job to finish
|
||||
await new Promise<void>(resolve => { resolveShutdown = resolve; });
|
||||
}
|
||||
// Clear remaining jobs
|
||||
jobs.length = 0;
|
||||
},
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user