import PQueue from 'p-queue'; import { generateText } from 'ai'; import { createAnthropic } from '@ai-sdk/anthropic'; import { createOpenAI } from '@ai-sdk/openai'; import { createGoogleGenerativeAI } from '@ai-sdk/google'; import { createOpenAICompatible } from '@ai-sdk/openai-compatible'; import { Job } from '../db/models.js'; import { broadcast } from '../ws/broadcast.js'; import { findModel, DEFAULT_MODEL_ID } from '../models.js'; // --------------------------------------------------------------------------- // Provider instances // --------------------------------------------------------------------------- const anthropic = createAnthropic({ apiKey: process.env.ANTHROPIC_API_KEY, }); const openai = createOpenAI({ apiKey: process.env.OPENAI_API_KEY, }); const google = createGoogleGenerativeAI({ apiKey: process.env.GOOGLE_API_KEY, }); // Ollama Cloud exposes an OpenAI-compatible /v1 endpoint. // Using @ai-sdk/openai-compatible avoids the local-Ollama schema validation // in ollama-ai-provider which requires fields (eval_duration etc.) that // Ollama Cloud doesn't return. const ollamaCloud = createOpenAICompatible({ name: 'ollama-cloud', baseURL: 'https://ollama.com/v1', headers: { Authorization: `Bearer ${process.env.OLLAMA_API_KEY ?? ''}`, }, }); const PROVIDERS = { anthropic: (id) => anthropic(id), openai: (id) => openai(id), google: (id) => google(id), ollama: (id) => ollamaCloud(id), }; function resolveModel(modelId) { const meta = findModel(modelId) ?? findModel(DEFAULT_MODEL_ID); return PROVIDERS[meta.provider](meta.id); } // --------------------------------------------------------------------------- // p-queue: in-process queue, no external server needed // --------------------------------------------------------------------------- export const queue = new PQueue({ concurrency: parseInt(process.env.JOB_CONCURRENCY ?? '3', 10), }); queue.on('add', () => broadcastQueueStats()); queue.on('next', () => broadcastQueueStats()); queue.on('idle', () => broadcastQueueStats()); function broadcastQueueStats() { broadcast({ type: 'queue_stats', pending: queue.size, running: queue.pending }); } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- async function setStatus(job, status, extra = {}) { await job.update({ status, ...extra }); broadcast({ type: 'job_update', job: job.toJSON() }); } // --------------------------------------------------------------------------- // Main runner // --------------------------------------------------------------------------- async function runJob(jobId) { const job = await Job.findByPk(jobId); if (!job) return; await setStatus(job, 'running'); try { const { text, usage } = await generateText({ model: resolveModel(job.model), messages: [ { role: 'user', content: [ { type: 'text', text: job.prompt }, { type: 'image', image: job.imageDataUrl }, ], }, ], maxTokens: 1024, }); await setStatus(job, 'done', { result: text, inputTokens: usage?.promptTokens ?? null, outputTokens: usage?.completionTokens ?? null, }); } catch (err) { const detail = err?.message ? `${err.name ?? 'Error'}: ${err.message}${err.cause ? `\nCause: ${JSON.stringify(err.cause, null, 2)}` : ''}` : String(err); await setStatus(job, 'error', { errorMessage: detail }); } } // --------------------------------------------------------------------------- // Public API // --------------------------------------------------------------------------- export function enqueueJob(jobId) { queue.add(() => runJob(jobId)); }