Files
vision-server/jobs/queue.js

176 lines
5.5 KiB
JavaScript

import PQueue from 'p-queue';
import { generateText } from 'ai';
import { createAnthropic } from '@ai-sdk/anthropic';
import { createOpenAI } from '@ai-sdk/openai';
import { createGoogleGenerativeAI } from '@ai-sdk/google';
import { createOpenAICompatible } from '@ai-sdk/openai-compatible';
import { GoogleGenAI } from '@google/genai';
import { Job } from '../db/models.js';
import { broadcast } from '../ws/broadcast.js';
import { findModel, DEFAULT_MODEL_ID, normalizeModelId } from '../models.js';
// ---------------------------------------------------------------------------
// Provider instances
// ---------------------------------------------------------------------------
const anthropic = createAnthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
const openai = createOpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
const google = createGoogleGenerativeAI({
apiKey: process.env.GOOGLE_API_KEY,
});
const geminiApi = new GoogleGenAI({
apiKey: process.env.GOOGLE_API_KEY,
});
// Ollama Cloud exposes an OpenAI-compatible /v1 endpoint.
// Using @ai-sdk/openai-compatible avoids the local-Ollama schema validation
// in ollama-ai-provider which requires fields (eval_duration etc.) that
// Ollama Cloud doesn't return.
const ollamaCloud = createOpenAICompatible({
name: 'ollama-cloud',
baseURL: 'https://ollama.com/v1',
headers: {
Authorization: `Bearer ${process.env.OLLAMA_API_KEY ?? ''}`,
},
});
const PROVIDERS = {
anthropic: (id) => anthropic(id),
openai: (id) => openai(id),
google: (id) => google(id),
ollama: (id) => ollamaCloud(id),
};
function resolveModel(modelId) {
const normalized = normalizeModelId(modelId);
const meta = findModel(normalized) ?? findModel(DEFAULT_MODEL_ID);
return PROVIDERS[meta.provider](meta.id);
}
function resolveModelMeta(modelId) {
const normalized = normalizeModelId(modelId);
const meta = findModel(normalized) ?? findModel(DEFAULT_MODEL_ID);
return { normalized, meta };
}
function dataUrlToInlineData(dataUrl) {
if (!dataUrl || typeof dataUrl !== 'string') return null;
// Expected: data:<mime>;base64,<data>
if (!dataUrl.startsWith('data:')) return null;
const comma = dataUrl.indexOf(',');
if (comma < 0) return null;
const header = dataUrl.slice(5, comma); // drop "data:"
const data = dataUrl.slice(comma + 1);
const isBase64 = header.includes(';base64');
const mimeType = header.split(';')[0] || 'application/octet-stream';
if (!isBase64 || !data) return null;
return { mimeType, data };
}
// ---------------------------------------------------------------------------
// p-queue: in-process queue, no external server needed
// ---------------------------------------------------------------------------
export const queue = new PQueue({
concurrency: parseInt(process.env.JOB_CONCURRENCY ?? '3', 10),
});
queue.on('add', () => broadcastQueueStats());
queue.on('next', () => broadcastQueueStats());
queue.on('idle', () => broadcastQueueStats());
function broadcastQueueStats() {
broadcast({ type: 'queue_stats', pending: queue.size, running: queue.pending });
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
async function setStatus(job, status, extra = {}) {
await job.update({ status, ...extra });
broadcast({ type: 'job_update', job: job.toJSON() });
}
// ---------------------------------------------------------------------------
// Main runner
// ---------------------------------------------------------------------------
async function runJob(jobId) {
const job = await Job.findByPk(jobId);
if (!job) return;
await setStatus(job, 'running');
try {
const { meta } = resolveModelMeta(job.model);
// Robotics-ER models are available via the Gemini API but may not be
// exposed through all provider wrappers. Route them through @google/genai.
if (String(meta.id).startsWith('gemini-robotics-er-')) {
const inline = dataUrlToInlineData(job.imageDataUrl);
const contents = [
{
role: 'user',
parts: [
...(inline ? [{ inlineData: inline }] : []),
{ text: job.prompt ?? '' },
],
},
];
const res = await geminiApi.models.generateContent({
model: meta.id,
contents,
});
const text = res?.text ?? '';
await setStatus(job, 'done', {
result: text,
inputTokens: null,
outputTokens: null,
});
return;
}
const { text, usage } = await generateText({
model: resolveModel(job.model),
messages: [
{
role: 'user',
content: [
{ type: 'text', text: job.prompt },
{ type: 'image', image: job.imageDataUrl },
],
},
],
maxTokens: 1024,
});
await setStatus(job, 'done', {
result: text,
inputTokens: usage?.promptTokens ?? null,
outputTokens: usage?.completionTokens ?? null,
});
} catch (err) {
const detail = err?.message
? `${err.name ?? 'Error'}: ${err.message}${err.cause ? `\nCause: ${JSON.stringify(err.cause, null, 2)}` : ''}`
: String(err);
await setStatus(job, 'error', { errorMessage: detail });
}
}
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
export function enqueueJob(jobId) {
queue.add(() => runJob(jobId));
}