support for background tasks

This commit is contained in:
SrGooglo 2025-02-05 02:49:06 +00:00
parent 49fa30fc4f
commit 288846ff99
2 changed files with 94 additions and 61 deletions

View File

@ -1,82 +1,110 @@
import path from "path" import path from "path"
import fs from "fs" import fs from "fs"
import RemoteUpload from "@services/remoteUpload"
import ChunkFileUpload from "@shared-classes/ChunkFileUpload" import ChunkFileUpload from "@shared-classes/ChunkFileUpload"
import RemoteUpload from "@services/remoteUpload"
const availableProviders = ["b2", "standard"] const availableProviders = ["b2", "standard"]
export default { export default {
useContext: ["cache", "limits"], useContext: ["cache", "limits"],
middlewares: [ middlewares: ["withAuthentication"],
"withAuthentication", fn: async (req, res) => {
], const uploadId = `${req.headers["uploader-file-id"]}_${Date.now()}`
fn: async (req, res) => {
const tmpPath = path.resolve(this.default.contexts.cache.constructor.cachePath, req.auth.session.user_id)
const limits = { const tmpPath = path.resolve(
maxFileSize: parseInt(this.default.contexts.limits.maxFileSizeInMB) * 1024 * 1024, this.default.contexts.cache.constructor.cachePath,
maxChunkSize: parseInt(this.default.contexts.limits.maxChunkSizeInMB) * 1024 * 1024, req.auth.session.user_id,
useCompression: true, )
useProvider: "standard",
}
const user = await req.auth.user() const limits = {
maxFileSize:
parseInt(this.default.contexts.limits.maxFileSizeInMB) *
1024 *
1024,
maxChunkSize:
parseInt(this.default.contexts.limits.maxChunkSizeInMB) *
1024 *
1024,
useCompression: true,
useProvider: "standard",
}
if (user.roles.includes("admin")) { const user = await req.auth.user()
// maxFileSize for admins 100GB
limits.maxFileSize = 100 * 1024 * 1024 * 1024
// optional compression for admins if (user.roles.includes("admin")) {
limits.useCompression = req.headers["use-compression"] ?? false // maxFileSize for admins 100GB
limits.maxFileSize = 100 * 1024 * 1024 * 1024
limits.useProvider = req.headers["provider-type"] ?? "b2" // optional compression for admins
} limits.useCompression = req.headers["use-compression"] ?? false
// check if provider is valid limits.useProvider = req.headers["provider-type"] ?? "b2"
if (!availableProviders.includes(limits.useProvider)) { }
throw new OperationError(400, "Invalid provider")
}
let build = await ChunkFileUpload(req, { // check if provider is valid
tmpDir: tmpPath, if (!availableProviders.includes(limits.useProvider)) {
...limits, throw new OperationError(400, "Invalid provider")
}).catch((err) => { }
throw new OperationError(err.code, err.message)
})
if (typeof build === "function") { let build = await ChunkFileUpload(req, {
try { tmpDir: tmpPath,
build = await build() ...limits,
}).catch((err) => {
throw new OperationError(err.code, err.message)
})
const result = await RemoteUpload({ if (typeof build === "function") {
parentDir: req.auth.session.user_id, try {
source: build.filePath, build = await build()
service: limits.useProvider,
useCompression: limits.useCompression,
transmux: req.headers["transmux"] ?? false,
transmuxOptions: req.headers["transmux-options"],
cachePath: tmpPath,
})
await fs.promises.rm(tmpPath, { recursive: true, force: true }).catch(() => { if (req.headers["transmux"] || limits.useCompression === true) {
return false // add a background task
}) const job = await global.queues.createJob("remote_upload", {
filePath: build.filePath,
parentDir: req.auth.session.user_id,
service: limits.useProvider,
useCompression: limits.useCompression,
transmux: req.headers["transmux"] ?? false,
transmuxOptions: req.headers["transmux-options"],
cachePath: tmpPath,
})
return result const sseChannelId = job.sseChannelId
} catch (error) {
await fs.promises.rm(tmpPath, { recursive: true, force: true }).catch(() => {
return false
})
throw new OperationError(error.code ?? 500, error.message ?? "Failed to upload file") return {
} uploadId: uploadId,
} sseChannelId: sseChannelId,
eventChannelURL: `https://${req.get("host")}/upload/sse_events/${sseChannelId}`,
}
} else {
const result = await RemoteUpload({
source: build.filePath,
parentDir: req.auth.session.user_id,
service: limits.useProvider,
useCompression: limits.useCompression,
cachePath: tmpPath,
})
return { return result
ok: 1, }
chunkNumber: req.headers["uploader-chunk-number"], } catch (error) {
} await fs.promises
} .rm(tmpPath, { recursive: true, force: true })
} .catch(() => {
return false
})
throw new OperationError(
error.code ?? 500,
error.message ?? "Failed to upload file",
)
}
}
return {
ok: 1,
chunkNumber: req.headers["uploader-chunk-number"],
}
},
}

View File

@ -0,0 +1,5 @@
export default async (req, res) => {
const { sse_channel_id } = req.params
global.sse.connectToChannelStream(sse_channel_id, req, res)
}