import PQueue from 'p-queue'; import { generateText } from 'ai'; import { createAnthropic } from '@ai-sdk/anthropic'; import { createOpenAI } from '@ai-sdk/openai'; import { createGoogleGenerativeAI } from '@ai-sdk/google'; import { createOpenAICompatible } from '@ai-sdk/openai-compatible'; import { GoogleGenAI } from '@google/genai'; import { Job } from '../db/models.js'; import { broadcast } from '../ws/broadcast.js'; import { findModel, DEFAULT_MODEL_ID, normalizeModelId } from '../models.js'; // --------------------------------------------------------------------------- // Provider instances // --------------------------------------------------------------------------- const anthropic = createAnthropic({ apiKey: process.env.ANTHROPIC_API_KEY, }); const openai = createOpenAI({ apiKey: process.env.OPENAI_API_KEY, }); const google = createGoogleGenerativeAI({ apiKey: process.env.GOOGLE_API_KEY, }); const geminiApi = new GoogleGenAI({ apiKey: process.env.GOOGLE_API_KEY, }); // Ollama Cloud exposes an OpenAI-compatible /v1 endpoint. // Using @ai-sdk/openai-compatible avoids the local-Ollama schema validation // in ollama-ai-provider which requires fields (eval_duration etc.) that // Ollama Cloud doesn't return. const ollamaCloud = createOpenAICompatible({ name: 'ollama-cloud', baseURL: 'https://ollama.com/v1', headers: { Authorization: `Bearer ${process.env.OLLAMA_API_KEY ?? ''}`, }, }); const PROVIDERS = { anthropic: (id) => anthropic(id), openai: (id) => openai(id), google: (id) => google(id), ollama: (id) => ollamaCloud(id), }; function resolveModel(modelId) { const normalized = normalizeModelId(modelId); const meta = findModel(normalized) ?? findModel(DEFAULT_MODEL_ID); return PROVIDERS[meta.provider](meta.id); } function resolveModelMeta(modelId) { const normalized = normalizeModelId(modelId); const meta = findModel(normalized) ?? findModel(DEFAULT_MODEL_ID); return { normalized, meta }; } function dataUrlToInlineData(dataUrl) { if (!dataUrl || typeof dataUrl !== 'string') return null; // Expected: data:;base64, if (!dataUrl.startsWith('data:')) return null; const comma = dataUrl.indexOf(','); if (comma < 0) return null; const header = dataUrl.slice(5, comma); // drop "data:" const data = dataUrl.slice(comma + 1); const isBase64 = header.includes(';base64'); const mimeType = header.split(';')[0] || 'application/octet-stream'; if (!isBase64 || !data) return null; return { mimeType, data }; } // --------------------------------------------------------------------------- // p-queue: in-process queue, no external server needed // --------------------------------------------------------------------------- export const queue = new PQueue({ concurrency: parseInt(process.env.JOB_CONCURRENCY ?? '3', 10), }); queue.on('add', () => broadcastQueueStats()); queue.on('active', () => broadcastQueueStats()); queue.on('next', () => broadcastQueueStats()); queue.on('idle', () => broadcastQueueStats()); function broadcastQueueStats() { broadcast({ type: 'queue_stats', pending: queue.size, running: queue.pending }); } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- async function setStatus(job, status, extra = {}) { await job.update({ status, ...extra }); broadcast({ type: 'job_update', job: job.toJSON() }); } // --------------------------------------------------------------------------- // Main runner // --------------------------------------------------------------------------- async function runJob(jobId) { const job = await Job.findByPk(jobId); if (!job) return; await setStatus(job, 'running'); try { const { meta } = resolveModelMeta(job.model); const { text, usage } = await generateText({ model: resolveModel(job.model), messages: [ { role: 'user', content: [ { type: 'text', text: job.prompt }, { type: 'image', image: job.imageDataUrl }, ], }, ], maxTokens: 1024, }); await setStatus(job, 'done', { result: text, inputTokens: usage?.promptTokens ?? null, outputTokens: usage?.completionTokens ?? null, }); } catch (err) { const detail = err?.message ? `${err.name ?? 'Error'}: ${err.message}${err.cause ? `\nCause: ${JSON.stringify(err.cause, null, 2)}` : ''}` : String(err); await setStatus(job, 'error', { errorMessage: detail }); } } // --------------------------------------------------------------------------- // Public API // --------------------------------------------------------------------------- export function enqueueJob(jobId) { queue.add(() => runJob(jobId)); }