2025-03-25 23:04:38 +00:00

147 lines
3.1 KiB
JavaScript

import fs from "node:fs"
import { Queue, Worker } from "bullmq"
import { composeURL as composeRedisConnectionString } from "@shared-classes/RedisClient"
export default class TaskQueueManager {
constructor(params) {
if (!params) {
throw new Error("Missing params")
}
this.params = params
this.queues = {}
this.workers = {}
}
async initialize(options = {}) {
const queues = fs.readdirSync(this.params.workersPath)
for await (const queue of queues) {
const queuePath = `${this.params.workersPath}/${queue}`
let queueObj = await import(queuePath)
queueObj = queueObj.default ?? queueObj
if (typeof queueObj.process === "undefined") {
continue
}
this.queues[queueObj.id] = await this.registerQueue(
queueObj,
options,
)
}
}
registerQueue = (queueObj, options) => {
const queue = new Queue(queueObj.id, {
connection: options.redisOptions,
defaultJobOptions: {
removeOnComplete: true,
},
})
const worker = new Worker(queueObj.id, queueObj.process, {
connection: options.redisOptions,
concurrency: queueObj.maxJobs ?? 1,
})
this.registerQueueEvents(worker)
this.queues[queueObj.id] = queue
this.workers[queueObj.id] = worker
return queue
}
registerQueueEvents = (worker) => {
worker.on("progress", (job, progress) => {
try {
console.log(`Job ${job.id} reported progress: ${progress}%`)
if (job.data.sseChannelId) {
global.sse.sendToChannel(job.data.sseChannelId, {
status: "progress",
events: "job_progress",
progress,
})
}
} catch (error) {
// manejar error
}
})
worker.on("completed", (job, result) => {
try {
console.log(`Job ${job.id} completed with result:`, result)
if (job.data.sseChannelId) {
global.sse.sendToChannel(job.data.sseChannelId, {
status: "done",
result,
})
}
} catch (error) {}
})
worker.on("failed", (job, error) => {
try {
console.error(`Job ${job.id} failed:`, error)
if (job.data.sseChannelId) {
global.sse.sendToChannel(job.data.sseChannelId, {
status: "error",
result: error.message,
})
}
} catch (error) {}
})
}
createJob = async (queueId, data, { useSSE = false } = {}) => {
const queue = this.queues[queueId]
if (!queue) {
throw new Error("Queue not found")
}
let sseChannelId = null
if (useSSE) {
sseChannelId = `${global.nanoid()}`
}
const job = await queue.add("default", {
...data,
sseChannelId,
})
if (sseChannelId) {
await global.sse.createChannel(sseChannelId)
console.log(
`[JOB] Created new job with SSE channel [${sseChannelId}]`,
)
await global.sse.sendToChannel(sseChannelId, {
status: "progress",
events: "job_queued",
progress: 5,
})
}
console.log(`[JOB] Created new job with ID [${job.id}]`)
return {
...job,
sseChannelId,
}
}
// this function cleans up all queues, must be synchronous
cleanUp = () => {
const queues = Object.values(this.queues)
queues.forEach((queue) => queue.close())
console.log("All queues have been closed")
}
}