From cc57742bc8ea560b50686290cf6abe83c609ddb1 Mon Sep 17 00:00:00 2001 From: SrGooglo Date: Fri, 28 Mar 2025 22:04:48 +0000 Subject: [PATCH] binary chunk upload --- .../server/classes/ChunkFileUpload/index.js | 342 ++++++++++-------- packages/server/classes/SSEManager/index.js | 13 +- .../files/classes/ChunkFileUpload/index.js | 208 +++++++++++ .../files/routes/upload/chunk/post.js | 44 ++- .../upload/sse_events/[sse_channel_id]/get.js | 6 +- 5 files changed, 432 insertions(+), 181 deletions(-) create mode 100755 packages/server/services/files/classes/ChunkFileUpload/index.js diff --git a/packages/server/classes/ChunkFileUpload/index.js b/packages/server/classes/ChunkFileUpload/index.js index e0deb957..98ea5a1e 100755 --- a/packages/server/classes/ChunkFileUpload/index.js +++ b/packages/server/classes/ChunkFileUpload/index.js @@ -6,213 +6,239 @@ import path from "node:path" import mimetypes from "mime-types" export function checkTotalSize( - chunkSize, // in bytes - totalChunks, // number of chunks - maxFileSize, // in bytes + chunkSize, // in bytes + totalChunks, // number of chunks + maxFileSize, // in bytes ) { - const totalSize = chunkSize * totalChunks + const totalSize = chunkSize * totalChunks - if (totalSize > maxFileSize) { - return false - } + if (totalSize > maxFileSize) { + return false + } - return true + return true } export function checkChunkUploadHeaders(headers) { - const requiredHeaders = [ - "uploader-chunk-number", - "uploader-chunks-total", - "uploader-original-name", - "uploader-file-id" - ] + const requiredHeaders = [ + "uploader-chunk-number", + "uploader-chunks-total", + "uploader-original-name", + "uploader-file-id", + ] - for (const header of requiredHeaders) { - if (!headers[header] || typeof headers[header] !== "string") { - return false - } + for (const header of requiredHeaders) { + if (!headers[header] || typeof headers[header] !== "string") { + return false + } - if ( - (header === "uploader-chunk-number" || header === "uploader-chunks-total") - && !/^[0-9]+$/.test(headers[header]) - ) { - return false - } - } + if ( + (header === "uploader-chunk-number" || + header === "uploader-chunks-total") && + !/^[0-9]+$/.test(headers[header]) + ) { + return false + } + } - return true + return true } - export function createAssembleChunksPromise({ - chunksPath, - filePath, - maxFileSize, + chunksPath, + filePath, + maxFileSize, }) { - return () => new Promise(async (resolve, reject) => { - let fileSize = 0 + return () => + new Promise(async (resolve, reject) => { + let fileSize = 0 - if (!fs.existsSync(chunksPath)) { - return reject(new OperationError(500, "No chunks found")) - } + if (!fs.existsSync(chunksPath)) { + return reject(new OperationError(500, "No chunks found")) + } - let chunks = await fs.promises.readdir(chunksPath) + let chunks = await fs.promises.readdir(chunksPath) - if (chunks.length === 0) { - return reject(new OperationError(500, "No chunks found")) - } + if (chunks.length === 0) { + return reject(new OperationError(500, "No chunks found")) + } - // Ordenar los chunks numéricamente - chunks = chunks.sort((a, b) => { - const aNum = parseInt(a, 10) - const bNum = parseInt(b, 10) + // Ordenar los chunks numéricamente + chunks = chunks.sort((a, b) => { + const aNum = parseInt(a, 10) + const bNum = parseInt(b, 10) - return aNum - bNum - }) + return aNum - bNum + }) - for (const chunk of chunks) { - const chunkPath = path.join(chunksPath, chunk) + for (const chunk of chunks) { + const chunkPath = path.join(chunksPath, chunk) - if (!fs.existsSync(chunkPath)) { - return reject(new OperationError(500, "No chunk data found")) - } + if (!fs.existsSync(chunkPath)) { + return reject( + new OperationError(500, "No chunk data found"), + ) + } - const data = await fs.promises.readFile(chunkPath) - fileSize += data.length + const data = await fs.promises.readFile(chunkPath) + fileSize += data.length - if (fileSize > maxFileSize) { - return reject(new OperationError(413, "File exceeds max total file size, aborting assembly...")) - } + if (fileSize > maxFileSize) { + return reject( + new OperationError( + 413, + "File exceeds max total file size, aborting assembly...", + ), + ) + } - await fs.promises.appendFile(filePath, data) - } + await fs.promises.appendFile(filePath, data) + } - return resolve({ - chunksLength: chunks.length, - filePath: filePath, - }) - }) + return resolve({ + chunksLength: chunks.length, + filePath: filePath, + }) + }) } export async function handleChunkFile( - fileStream, - { - tmpDir, - headers, - maxFileSize, - maxChunkSize - } + fileStream, + { tmpDir, headers, maxFileSize, maxChunkSize }, ) { - return await new Promise(async (resolve, reject) => { - const workPath = path.join(tmpDir, headers["uploader-file-id"]) - const chunksPath = path.join(workPath, "chunks") - const chunkPath = path.join(chunksPath, headers["uploader-chunk-number"]) + return await new Promise(async (resolve, reject) => { + const workPath = path.join(tmpDir, headers["uploader-file-id"]) + const chunksPath = path.join(workPath, "chunks") + const chunkPath = path.join( + chunksPath, + headers["uploader-chunk-number"], + ) - const chunkCount = +headers["uploader-chunk-number"] - const totalChunks = +headers["uploader-chunks-total"] + const chunkCount = +headers["uploader-chunk-number"] + const totalChunks = +headers["uploader-chunks-total"] - // check if file has all chunks uploaded - const isLast = chunkCount === totalChunks - 1 + // check if file has all chunks uploaded + const isLast = chunkCount === totalChunks - 1 - // make sure chunk is in range - if (chunkCount < 0 || chunkCount >= totalChunks) { - return reject(new OperationError(500, "Chunk is out of range")) - } + // make sure chunk is in range + if (chunkCount < 0 || chunkCount >= totalChunks) { + return reject(new OperationError(500, "Chunk is out of range")) + } - // if is the first chunk check if dir exists before write things - if (chunkCount === 0) { - try { - if (!await fs.promises.stat(chunksPath).catch(() => false)) { - await fs.promises.mkdir(chunksPath, { recursive: true }) - } - } catch (error) { - return reject(new OperationError(500, error.message)) - } - } + // if is the first chunk check if dir exists before write things + if (chunkCount === 0) { + try { + if (!(await fs.promises.stat(chunksPath).catch(() => false))) { + await fs.promises.mkdir(chunksPath, { recursive: true }) + } + } catch (error) { + return reject(new OperationError(500, error.message)) + } + } - let dataWritten = 0 + let dataWritten = 0 - let writeStream = fs.createWriteStream(chunkPath) + let writeStream = fs.createWriteStream(chunkPath) - writeStream.on("error", (err) => { - reject(err) - }) + writeStream.on("error", (err) => { + reject(err) + }) - writeStream.on("close", () => { - if (maxChunkSize !== undefined) { - if (dataWritten > maxChunkSize) { - reject(new OperationError(413, "Chunk size exceeds max chunk size, aborting upload...")) - return - } + writeStream.on("close", () => { + if (maxChunkSize !== undefined) { + if (dataWritten > maxChunkSize) { + reject( + new OperationError( + 413, + "Chunk size exceeds max chunk size, aborting upload...", + ), + ) + return + } - // estimate total file size, - // if estimation exceeds maxFileSize, abort upload - if (chunkCount === 0 && totalChunks > 0) { - if ((dataWritten * (totalChunks - 1)) > maxFileSize) { - reject(new OperationError(413, "File estimated size exceeds max total file size, aborting upload...")) - return - } - } - } + // estimate total file size, + // if estimation exceeds maxFileSize, abort upload + if (chunkCount === 0 && totalChunks > 0) { + if (dataWritten * (totalChunks - 1) > maxFileSize) { + reject( + new OperationError( + 413, + "File estimated size exceeds max total file size, aborting upload...", + ), + ) + return + } + } + } - if (isLast) { - const mimetype = mimetypes.lookup(headers["uploader-original-name"]) - const extension = mimetypes.extension(mimetype) + if (isLast) { + const mimetype = mimetypes.lookup( + headers["uploader-original-name"], + ) + const extension = mimetypes.extension(mimetype) - let filename = headers["uploader-file-id"] + let filename = headers["uploader-file-id"] - if (headers["uploader-use-date"] === "true") { - filename = `${filename}_${Date.now()}` - } + if (headers["uploader-use-date"] === "true") { + filename = `${filename}_${Date.now()}` + } - return resolve(createAssembleChunksPromise({ - // build data - chunksPath: chunksPath, - filePath: path.resolve(workPath, `${filename}.${extension}`), - maxFileSize: maxFileSize, - })) - } + return resolve( + createAssembleChunksPromise({ + // build data + chunksPath: chunksPath, + filePath: path.resolve( + workPath, + `${filename}.${extension}`, + ), + maxFileSize: maxFileSize, + }), + ) + } - return resolve(null) - }) + return resolve(null) + }) - fileStream.on("data", (buffer) => { - dataWritten += buffer.byteLength - }) + fileStream.on("data", (buffer) => { + dataWritten += buffer.byteLength + }) - fileStream.pipe(writeStream) - }) + fileStream.pipe(writeStream) + }) } export async function uploadChunkFile( - req, - { - tmpDir, - maxFileSize, - maxChunkSize, - } + req, + { tmpDir, maxFileSize, maxChunkSize }, ) { - return await new Promise(async (resolve, reject) => { - if (!checkChunkUploadHeaders(req.headers)) { - reject(new OperationError(400, "Missing header(s)")) - return - } + // create a readable stream from req.body data blob + // + const chunkData = new Blob([req.body], { type: "application/octet-stream" }) - await req.multipart(async (field) => { - try { - const result = await handleChunkFile(field.file.stream, { - tmpDir: tmpDir, - headers: req.headers, - maxFileSize: maxFileSize, - maxChunkSize: maxChunkSize, - }) + console.log(chunkData) - return resolve(result) - } catch (error) { - return reject(error) - } - }) - }) + if (!checkChunkUploadHeaders(req.headers)) { + reject(new OperationError(400, "Missing header(s)")) + return + } + + // return await new Promise(async (resolve, reject) => { + // // create a readable node stream from "req.body" (octet-stream) + // await req.multipart(async (field) => { + // try { + // const result = await handleChunkFile(field.file.stream, { + // tmpDir: tmpDir, + // headers: req.headers, + // maxFileSize: maxFileSize, + // maxChunkSize: maxChunkSize, + // }) + + // return resolve(result) + // } catch (error) { + // return reject(error) + // } + // }) + // }) } -export default uploadChunkFile \ No newline at end of file +export default uploadChunkFile diff --git a/packages/server/classes/SSEManager/index.js b/packages/server/classes/SSEManager/index.js index 862924c7..b858ed0f 100644 --- a/packages/server/classes/SSEManager/index.js +++ b/packages/server/classes/SSEManager/index.js @@ -33,7 +33,6 @@ export default class SSEManager { if (!channel) { channel = this.createChannel(channelId) - //throw new OperationError(404, `Channel [${channelId}] not found`) } channel.clients.add(req) @@ -43,16 +42,16 @@ export default class SSEManager { res.setHeader("Connection", "keep-alive") res.status(200) - // if (channel.cache.length > 0) { - // for (const oldData of channel.cache) { - // this.writeJSONToResponse(res, oldData) - // } - // } - this.writeJSONToResponse(res, { event: "connected", }) + if (channel.cache.length > 0) { + for (const oldData of channel.cache) { + this.writeJSONToResponse(res, oldData) + } + } + if (initialData) { this.writeJSONToResponse(res, initialData) } diff --git a/packages/server/services/files/classes/ChunkFileUpload/index.js b/packages/server/services/files/classes/ChunkFileUpload/index.js new file mode 100755 index 00000000..8c677970 --- /dev/null +++ b/packages/server/services/files/classes/ChunkFileUpload/index.js @@ -0,0 +1,208 @@ +// Orginal forked from: Buzut/huge-uploader-nodejs +// Copyright (c) 2018, Quentin Busuttil All rights reserved. + +import fs from "node:fs" +import path from "node:path" +import mimetypes from "mime-types" + +export function checkTotalSize( + chunkSize, // in bytes + totalChunks, // number of chunks + maxFileSize, // in bytes +) { + const totalSize = chunkSize * totalChunks + + if (totalSize > maxFileSize) { + return false + } + + return true +} + +export function checkChunkUploadHeaders(headers) { + const requiredHeaders = [ + "uploader-chunk-number", + "uploader-chunks-total", + "uploader-original-name", + "uploader-file-id", + ] + + for (const header of requiredHeaders) { + if (!headers[header] || typeof headers[header] !== "string") { + return false + } + + if ( + (header === "uploader-chunk-number" || + header === "uploader-chunks-total") && + !/^[0-9]+$/.test(headers[header]) + ) { + return false + } + } + + return true +} + +export function createAssembleChunksPromise({ + chunksPath, + filePath, + maxFileSize, +}) { + return () => + new Promise(async (resolve, reject) => { + let fileSize = 0 + + if (!fs.existsSync(chunksPath)) { + return reject(new OperationError(500, "No chunks found")) + } + + let chunks = await fs.promises.readdir(chunksPath) + + if (chunks.length === 0) { + return reject(new OperationError(500, "No chunks found")) + } + + // Ordenar los chunks numéricamente + chunks = chunks.sort((a, b) => { + const aNum = parseInt(a, 10) + const bNum = parseInt(b, 10) + + return aNum - bNum + }) + + for (const chunk of chunks) { + const chunkPath = path.join(chunksPath, chunk) + + if (!fs.existsSync(chunkPath)) { + return reject( + new OperationError(500, "No chunk data found"), + ) + } + + const data = await fs.promises.readFile(chunkPath) + fileSize += data.length + + if (fileSize > maxFileSize) { + return reject( + new OperationError( + 413, + "File exceeds max total file size, aborting assembly...", + ), + ) + } + + await fs.promises.appendFile(filePath, data) + } + + return resolve({ + chunksLength: chunks.length, + filePath: filePath, + }) + }) +} + +export async function handleChunkFile( + fileStream, + { tmpDir, headers, maxFileSize, maxChunkSize }, +) { + return await new Promise(async (resolve, reject) => { + const workPath = path.join(tmpDir, headers["uploader-file-id"]) + const chunksPath = path.join(workPath, "chunks") + const chunkPath = path.join( + chunksPath, + headers["uploader-chunk-number"], + ) + + const chunkCount = +headers["uploader-chunk-number"] + const totalChunks = +headers["uploader-chunks-total"] + + // check if file has all chunks uploaded + const isLast = chunkCount === totalChunks - 1 + + // make sure chunk is in range + if (chunkCount < 0 || chunkCount >= totalChunks) { + return reject(new OperationError(500, "Chunk is out of range")) + } + + // if is the first chunk check if dir exists before write things + if (chunkCount === 0) { + try { + if (!(await fs.promises.stat(chunksPath).catch(() => false))) { + await fs.promises.mkdir(chunksPath, { recursive: true }) + } + } catch (error) { + return reject(new OperationError(500, error.message)) + } + } + + let dataWritten = 0 + + let writeStream = fs.createWriteStream(chunkPath) + + writeStream.on("error", (err) => { + reject(err) + }) + + writeStream.on("close", () => { + if (maxChunkSize !== undefined) { + if (dataWritten > maxChunkSize) { + reject( + new OperationError( + 413, + "Chunk size exceeds max chunk size, aborting upload...", + ), + ) + return + } + + // estimate total file size, + // if estimation exceeds maxFileSize, abort upload + if (chunkCount === 0 && totalChunks > 0) { + if (dataWritten * (totalChunks - 1) > maxFileSize) { + reject( + new OperationError( + 413, + "File estimated size exceeds max total file size, aborting upload...", + ), + ) + return + } + } + } + + if (isLast) { + const mimetype = mimetypes.lookup( + headers["uploader-original-name"], + ) + const extension = mimetypes.extension(mimetype) + + let filename = headers["uploader-file-id"] + + if (headers["uploader-use-date"] === "true") { + filename = `${filename}_${Date.now()}` + } + + return resolve( + createAssembleChunksPromise({ + // build data + chunksPath: chunksPath, + filePath: path.resolve( + workPath, + `${filename}.${extension}`, + ), + maxFileSize: maxFileSize, + }), + ) + } + + return resolve(null) + }) + + fileStream.on("data", (buffer) => { + dataWritten += buffer.byteLength + }) + + fileStream.pipe(writeStream) + }) +} diff --git a/packages/server/services/files/routes/upload/chunk/post.js b/packages/server/services/files/routes/upload/chunk/post.js index 53eb482e..63ff90fd 100644 --- a/packages/server/services/files/routes/upload/chunk/post.js +++ b/packages/server/services/files/routes/upload/chunk/post.js @@ -1,15 +1,30 @@ -import path from "path" -import fs from "fs" +import { Duplex } from "node:stream" +import path from "node:path" +import fs from "node:fs" import RemoteUpload from "@services/remoteUpload" - -import ChunkFileUpload from "@shared-classes/ChunkFileUpload" +import { + checkChunkUploadHeaders, + handleChunkFile, +} from "@classes/ChunkFileUpload" const availableProviders = ["b2", "standard"] +function bufferToStream(bf) { + let tmp = new Duplex() + tmp.push(bf) + tmp.push(null) + return tmp +} + export default { useContext: ["cache", "limits"], middlewares: ["withAuthentication"], fn: async (req, res) => { + if (!checkChunkUploadHeaders(req.headers)) { + reject(new OperationError(400, "Missing header(s)")) + return + } + const uploadId = `${req.headers["uploader-file-id"]}_${Date.now()}` const tmpPath = path.resolve( @@ -47,23 +62,26 @@ export default { throw new OperationError(400, "Invalid provider") } - let build = await ChunkFileUpload(req, { + // create a readable stream from req.body(buffer) + const dataStream = bufferToStream(await req.buffer()) + + let result = await handleChunkFile(dataStream, { tmpDir: tmpPath, - ...limits, - }).catch((err) => { - throw new OperationError(err.code, err.message) + headers: req.headers, + maxFileSize: limits.maxFileSize, + maxChunkSize: limits.maxChunkSize, }) - if (typeof build === "function") { + if (typeof result === "function") { try { - build = await build() + result = await result() if (req.headers["transmux"] || limits.useCompression === true) { // add a background task const job = await global.queues.createJob( "remote_upload", { - filePath: build.filePath, + filePath: result.filePath, parentDir: req.auth.session.user_id, service: limits.useProvider, useCompression: limits.useCompression, @@ -81,11 +99,11 @@ export default { return { uploadId: uploadId, sseChannelId: sseChannelId, - eventChannelURL: `${req.protocol}://${req.get("host")}/upload/sse_events/${sseChannelId}`, + eventChannelURL: `${req.headers["x-forwarded-proto"] || req.protocol}://${req.get("host")}/upload/sse_events/${sseChannelId}`, } } else { const result = await RemoteUpload({ - source: build.filePath, + source: result.filePath, parentDir: req.auth.session.user_id, service: limits.useProvider, useCompression: limits.useCompression, diff --git a/packages/server/services/files/routes/upload/sse_events/[sse_channel_id]/get.js b/packages/server/services/files/routes/upload/sse_events/[sse_channel_id]/get.js index 8396f467..1665e4d0 100644 --- a/packages/server/services/files/routes/upload/sse_events/[sse_channel_id]/get.js +++ b/packages/server/services/files/routes/upload/sse_events/[sse_channel_id]/get.js @@ -1,5 +1,5 @@ export default async (req, res) => { - const { sse_channel_id } = req.params + const { sse_channel_id } = req.params - global.sse.connectToChannelStream(sse_channel_id, req, res) -} \ No newline at end of file + global.sse.connectToChannelStream(sse_channel_id, req, res) +}