From 288846ff99158fa215dfb7df921c8af7cba6ef06 Mon Sep 17 00:00:00 2001 From: SrGooglo Date: Wed, 5 Feb 2025 02:49:06 +0000 Subject: [PATCH] support for background tasks --- .../files/routes/upload/chunk/post.js | 150 +++++++++++------- .../upload/sse_events/[sse_channel_id]/get.js | 5 + 2 files changed, 94 insertions(+), 61 deletions(-) create mode 100644 packages/server/services/files/routes/upload/sse_events/[sse_channel_id]/get.js diff --git a/packages/server/services/files/routes/upload/chunk/post.js b/packages/server/services/files/routes/upload/chunk/post.js index fd91a225..8f49b6a3 100644 --- a/packages/server/services/files/routes/upload/chunk/post.js +++ b/packages/server/services/files/routes/upload/chunk/post.js @@ -1,82 +1,110 @@ import path from "path" import fs from "fs" +import RemoteUpload from "@services/remoteUpload" import ChunkFileUpload from "@shared-classes/ChunkFileUpload" -import RemoteUpload from "@services/remoteUpload" - const availableProviders = ["b2", "standard"] export default { - useContext: ["cache", "limits"], - middlewares: [ - "withAuthentication", - ], - fn: async (req, res) => { - const tmpPath = path.resolve(this.default.contexts.cache.constructor.cachePath, req.auth.session.user_id) + useContext: ["cache", "limits"], + middlewares: ["withAuthentication"], + fn: async (req, res) => { + const uploadId = `${req.headers["uploader-file-id"]}_${Date.now()}` - const limits = { - maxFileSize: parseInt(this.default.contexts.limits.maxFileSizeInMB) * 1024 * 1024, - maxChunkSize: parseInt(this.default.contexts.limits.maxChunkSizeInMB) * 1024 * 1024, - useCompression: true, - useProvider: "standard", - } + const tmpPath = path.resolve( + this.default.contexts.cache.constructor.cachePath, + req.auth.session.user_id, + ) - const user = await req.auth.user() + const limits = { + maxFileSize: + parseInt(this.default.contexts.limits.maxFileSizeInMB) * + 1024 * + 1024, + maxChunkSize: + parseInt(this.default.contexts.limits.maxChunkSizeInMB) * + 1024 * + 1024, + useCompression: true, + useProvider: "standard", + } - if (user.roles.includes("admin")) { - // maxFileSize for admins 100GB - limits.maxFileSize = 100 * 1024 * 1024 * 1024 + const user = await req.auth.user() - // optional compression for admins - limits.useCompression = req.headers["use-compression"] ?? false + if (user.roles.includes("admin")) { + // maxFileSize for admins 100GB + limits.maxFileSize = 100 * 1024 * 1024 * 1024 - limits.useProvider = req.headers["provider-type"] ?? "b2" - } + // optional compression for admins + limits.useCompression = req.headers["use-compression"] ?? false - // check if provider is valid - if (!availableProviders.includes(limits.useProvider)) { - throw new OperationError(400, "Invalid provider") - } + limits.useProvider = req.headers["provider-type"] ?? "b2" + } - let build = await ChunkFileUpload(req, { - tmpDir: tmpPath, - ...limits, - }).catch((err) => { - throw new OperationError(err.code, err.message) - }) + // check if provider is valid + if (!availableProviders.includes(limits.useProvider)) { + throw new OperationError(400, "Invalid provider") + } - if (typeof build === "function") { - try { - build = await build() + let build = await ChunkFileUpload(req, { + tmpDir: tmpPath, + ...limits, + }).catch((err) => { + throw new OperationError(err.code, err.message) + }) - const result = await RemoteUpload({ - parentDir: req.auth.session.user_id, - source: build.filePath, - service: limits.useProvider, - useCompression: limits.useCompression, - transmux: req.headers["transmux"] ?? false, - transmuxOptions: req.headers["transmux-options"], - cachePath: tmpPath, - }) + if (typeof build === "function") { + try { + build = await build() - await fs.promises.rm(tmpPath, { recursive: true, force: true }).catch(() => { - return false - }) + if (req.headers["transmux"] || limits.useCompression === true) { + // add a background task + const job = await global.queues.createJob("remote_upload", { + filePath: build.filePath, + parentDir: req.auth.session.user_id, + service: limits.useProvider, + useCompression: limits.useCompression, + transmux: req.headers["transmux"] ?? false, + transmuxOptions: req.headers["transmux-options"], + cachePath: tmpPath, + }) - return result - } catch (error) { - await fs.promises.rm(tmpPath, { recursive: true, force: true }).catch(() => { - return false - }) + const sseChannelId = job.sseChannelId - throw new OperationError(error.code ?? 500, error.message ?? "Failed to upload file") - } - } + return { + uploadId: uploadId, + sseChannelId: sseChannelId, + eventChannelURL: `https://${req.get("host")}/upload/sse_events/${sseChannelId}`, + } + } else { + const result = await RemoteUpload({ + source: build.filePath, + parentDir: req.auth.session.user_id, + service: limits.useProvider, + useCompression: limits.useCompression, + cachePath: tmpPath, + }) - return { - ok: 1, - chunkNumber: req.headers["uploader-chunk-number"], - } - } -} \ No newline at end of file + return result + } + } catch (error) { + await fs.promises + .rm(tmpPath, { recursive: true, force: true }) + .catch(() => { + return false + }) + + throw new OperationError( + error.code ?? 500, + error.message ?? "Failed to upload file", + ) + } + } + + return { + ok: 1, + chunkNumber: req.headers["uploader-chunk-number"], + } + }, +} diff --git a/packages/server/services/files/routes/upload/sse_events/[sse_channel_id]/get.js b/packages/server/services/files/routes/upload/sse_events/[sse_channel_id]/get.js new file mode 100644 index 00000000..8396f467 --- /dev/null +++ b/packages/server/services/files/routes/upload/sse_events/[sse_channel_id]/get.js @@ -0,0 +1,5 @@ +export default async (req, res) => { + const { sse_channel_id } = req.params + + global.sse.connectToChannelStream(sse_channel_id, req, res) +} \ No newline at end of file