binary chunk upload

This commit is contained in:
SrGooglo 2025-03-28 22:04:48 +00:00
parent d6248827c6
commit cc57742bc8
5 changed files with 432 additions and 181 deletions

View File

@ -6,213 +6,239 @@ import path from "node:path"
import mimetypes from "mime-types" import mimetypes from "mime-types"
export function checkTotalSize( export function checkTotalSize(
chunkSize, // in bytes chunkSize, // in bytes
totalChunks, // number of chunks totalChunks, // number of chunks
maxFileSize, // in bytes maxFileSize, // in bytes
) { ) {
const totalSize = chunkSize * totalChunks const totalSize = chunkSize * totalChunks
if (totalSize > maxFileSize) { if (totalSize > maxFileSize) {
return false return false
} }
return true return true
} }
export function checkChunkUploadHeaders(headers) { export function checkChunkUploadHeaders(headers) {
const requiredHeaders = [ const requiredHeaders = [
"uploader-chunk-number", "uploader-chunk-number",
"uploader-chunks-total", "uploader-chunks-total",
"uploader-original-name", "uploader-original-name",
"uploader-file-id" "uploader-file-id",
] ]
for (const header of requiredHeaders) { for (const header of requiredHeaders) {
if (!headers[header] || typeof headers[header] !== "string") { if (!headers[header] || typeof headers[header] !== "string") {
return false return false
} }
if ( if (
(header === "uploader-chunk-number" || header === "uploader-chunks-total") (header === "uploader-chunk-number" ||
&& !/^[0-9]+$/.test(headers[header]) header === "uploader-chunks-total") &&
) { !/^[0-9]+$/.test(headers[header])
return false ) {
} return false
} }
}
return true return true
} }
export function createAssembleChunksPromise({ export function createAssembleChunksPromise({
chunksPath, chunksPath,
filePath, filePath,
maxFileSize, maxFileSize,
}) { }) {
return () => new Promise(async (resolve, reject) => { return () =>
let fileSize = 0 new Promise(async (resolve, reject) => {
let fileSize = 0
if (!fs.existsSync(chunksPath)) { if (!fs.existsSync(chunksPath)) {
return reject(new OperationError(500, "No chunks found")) return reject(new OperationError(500, "No chunks found"))
} }
let chunks = await fs.promises.readdir(chunksPath) let chunks = await fs.promises.readdir(chunksPath)
if (chunks.length === 0) { if (chunks.length === 0) {
return reject(new OperationError(500, "No chunks found")) return reject(new OperationError(500, "No chunks found"))
} }
// Ordenar los chunks numéricamente // Ordenar los chunks numéricamente
chunks = chunks.sort((a, b) => { chunks = chunks.sort((a, b) => {
const aNum = parseInt(a, 10) const aNum = parseInt(a, 10)
const bNum = parseInt(b, 10) const bNum = parseInt(b, 10)
return aNum - bNum return aNum - bNum
}) })
for (const chunk of chunks) { for (const chunk of chunks) {
const chunkPath = path.join(chunksPath, chunk) const chunkPath = path.join(chunksPath, chunk)
if (!fs.existsSync(chunkPath)) { if (!fs.existsSync(chunkPath)) {
return reject(new OperationError(500, "No chunk data found")) return reject(
} new OperationError(500, "No chunk data found"),
)
}
const data = await fs.promises.readFile(chunkPath) const data = await fs.promises.readFile(chunkPath)
fileSize += data.length fileSize += data.length
if (fileSize > maxFileSize) { if (fileSize > maxFileSize) {
return reject(new OperationError(413, "File exceeds max total file size, aborting assembly...")) return reject(
} new OperationError(
413,
"File exceeds max total file size, aborting assembly...",
),
)
}
await fs.promises.appendFile(filePath, data) await fs.promises.appendFile(filePath, data)
} }
return resolve({ return resolve({
chunksLength: chunks.length, chunksLength: chunks.length,
filePath: filePath, filePath: filePath,
}) })
}) })
} }
export async function handleChunkFile( export async function handleChunkFile(
fileStream, fileStream,
{ { tmpDir, headers, maxFileSize, maxChunkSize },
tmpDir,
headers,
maxFileSize,
maxChunkSize
}
) { ) {
return await new Promise(async (resolve, reject) => { return await new Promise(async (resolve, reject) => {
const workPath = path.join(tmpDir, headers["uploader-file-id"]) const workPath = path.join(tmpDir, headers["uploader-file-id"])
const chunksPath = path.join(workPath, "chunks") const chunksPath = path.join(workPath, "chunks")
const chunkPath = path.join(chunksPath, headers["uploader-chunk-number"]) const chunkPath = path.join(
chunksPath,
headers["uploader-chunk-number"],
)
const chunkCount = +headers["uploader-chunk-number"] const chunkCount = +headers["uploader-chunk-number"]
const totalChunks = +headers["uploader-chunks-total"] const totalChunks = +headers["uploader-chunks-total"]
// check if file has all chunks uploaded // check if file has all chunks uploaded
const isLast = chunkCount === totalChunks - 1 const isLast = chunkCount === totalChunks - 1
// make sure chunk is in range // make sure chunk is in range
if (chunkCount < 0 || chunkCount >= totalChunks) { if (chunkCount < 0 || chunkCount >= totalChunks) {
return reject(new OperationError(500, "Chunk is out of range")) return reject(new OperationError(500, "Chunk is out of range"))
} }
// if is the first chunk check if dir exists before write things // if is the first chunk check if dir exists before write things
if (chunkCount === 0) { if (chunkCount === 0) {
try { try {
if (!await fs.promises.stat(chunksPath).catch(() => false)) { if (!(await fs.promises.stat(chunksPath).catch(() => false))) {
await fs.promises.mkdir(chunksPath, { recursive: true }) await fs.promises.mkdir(chunksPath, { recursive: true })
} }
} catch (error) { } catch (error) {
return reject(new OperationError(500, error.message)) return reject(new OperationError(500, error.message))
} }
} }
let dataWritten = 0 let dataWritten = 0
let writeStream = fs.createWriteStream(chunkPath) let writeStream = fs.createWriteStream(chunkPath)
writeStream.on("error", (err) => { writeStream.on("error", (err) => {
reject(err) reject(err)
}) })
writeStream.on("close", () => { writeStream.on("close", () => {
if (maxChunkSize !== undefined) { if (maxChunkSize !== undefined) {
if (dataWritten > maxChunkSize) { if (dataWritten > maxChunkSize) {
reject(new OperationError(413, "Chunk size exceeds max chunk size, aborting upload...")) reject(
return new OperationError(
} 413,
"Chunk size exceeds max chunk size, aborting upload...",
),
)
return
}
// estimate total file size, // estimate total file size,
// if estimation exceeds maxFileSize, abort upload // if estimation exceeds maxFileSize, abort upload
if (chunkCount === 0 && totalChunks > 0) { if (chunkCount === 0 && totalChunks > 0) {
if ((dataWritten * (totalChunks - 1)) > maxFileSize) { if (dataWritten * (totalChunks - 1) > maxFileSize) {
reject(new OperationError(413, "File estimated size exceeds max total file size, aborting upload...")) reject(
return new OperationError(
} 413,
} "File estimated size exceeds max total file size, aborting upload...",
} ),
)
return
}
}
}
if (isLast) { if (isLast) {
const mimetype = mimetypes.lookup(headers["uploader-original-name"]) const mimetype = mimetypes.lookup(
const extension = mimetypes.extension(mimetype) headers["uploader-original-name"],
)
const extension = mimetypes.extension(mimetype)
let filename = headers["uploader-file-id"] let filename = headers["uploader-file-id"]
if (headers["uploader-use-date"] === "true") { if (headers["uploader-use-date"] === "true") {
filename = `${filename}_${Date.now()}` filename = `${filename}_${Date.now()}`
} }
return resolve(createAssembleChunksPromise({ return resolve(
// build data createAssembleChunksPromise({
chunksPath: chunksPath, // build data
filePath: path.resolve(workPath, `${filename}.${extension}`), chunksPath: chunksPath,
maxFileSize: maxFileSize, filePath: path.resolve(
})) workPath,
} `${filename}.${extension}`,
),
maxFileSize: maxFileSize,
}),
)
}
return resolve(null) return resolve(null)
}) })
fileStream.on("data", (buffer) => { fileStream.on("data", (buffer) => {
dataWritten += buffer.byteLength dataWritten += buffer.byteLength
}) })
fileStream.pipe(writeStream) fileStream.pipe(writeStream)
}) })
} }
export async function uploadChunkFile( export async function uploadChunkFile(
req, req,
{ { tmpDir, maxFileSize, maxChunkSize },
tmpDir,
maxFileSize,
maxChunkSize,
}
) { ) {
return await new Promise(async (resolve, reject) => { // create a readable stream from req.body data blob
if (!checkChunkUploadHeaders(req.headers)) { //
reject(new OperationError(400, "Missing header(s)")) const chunkData = new Blob([req.body], { type: "application/octet-stream" })
return
}
await req.multipart(async (field) => { console.log(chunkData)
try {
const result = await handleChunkFile(field.file.stream, {
tmpDir: tmpDir,
headers: req.headers,
maxFileSize: maxFileSize,
maxChunkSize: maxChunkSize,
})
return resolve(result) if (!checkChunkUploadHeaders(req.headers)) {
} catch (error) { reject(new OperationError(400, "Missing header(s)"))
return reject(error) return
} }
})
}) // return await new Promise(async (resolve, reject) => {
// // create a readable node stream from "req.body" (octet-stream)
// await req.multipart(async (field) => {
// try {
// const result = await handleChunkFile(field.file.stream, {
// tmpDir: tmpDir,
// headers: req.headers,
// maxFileSize: maxFileSize,
// maxChunkSize: maxChunkSize,
// })
// return resolve(result)
// } catch (error) {
// return reject(error)
// }
// })
// })
} }
export default uploadChunkFile export default uploadChunkFile

View File

@ -33,7 +33,6 @@ export default class SSEManager {
if (!channel) { if (!channel) {
channel = this.createChannel(channelId) channel = this.createChannel(channelId)
//throw new OperationError(404, `Channel [${channelId}] not found`)
} }
channel.clients.add(req) channel.clients.add(req)
@ -43,16 +42,16 @@ export default class SSEManager {
res.setHeader("Connection", "keep-alive") res.setHeader("Connection", "keep-alive")
res.status(200) res.status(200)
// if (channel.cache.length > 0) {
// for (const oldData of channel.cache) {
// this.writeJSONToResponse(res, oldData)
// }
// }
this.writeJSONToResponse(res, { this.writeJSONToResponse(res, {
event: "connected", event: "connected",
}) })
if (channel.cache.length > 0) {
for (const oldData of channel.cache) {
this.writeJSONToResponse(res, oldData)
}
}
if (initialData) { if (initialData) {
this.writeJSONToResponse(res, initialData) this.writeJSONToResponse(res, initialData)
} }

View File

@ -0,0 +1,208 @@
// Orginal forked from: Buzut/huge-uploader-nodejs
// Copyright (c) 2018, Quentin Busuttil All rights reserved.
import fs from "node:fs"
import path from "node:path"
import mimetypes from "mime-types"
export function checkTotalSize(
chunkSize, // in bytes
totalChunks, // number of chunks
maxFileSize, // in bytes
) {
const totalSize = chunkSize * totalChunks
if (totalSize > maxFileSize) {
return false
}
return true
}
export function checkChunkUploadHeaders(headers) {
const requiredHeaders = [
"uploader-chunk-number",
"uploader-chunks-total",
"uploader-original-name",
"uploader-file-id",
]
for (const header of requiredHeaders) {
if (!headers[header] || typeof headers[header] !== "string") {
return false
}
if (
(header === "uploader-chunk-number" ||
header === "uploader-chunks-total") &&
!/^[0-9]+$/.test(headers[header])
) {
return false
}
}
return true
}
export function createAssembleChunksPromise({
chunksPath,
filePath,
maxFileSize,
}) {
return () =>
new Promise(async (resolve, reject) => {
let fileSize = 0
if (!fs.existsSync(chunksPath)) {
return reject(new OperationError(500, "No chunks found"))
}
let chunks = await fs.promises.readdir(chunksPath)
if (chunks.length === 0) {
return reject(new OperationError(500, "No chunks found"))
}
// Ordenar los chunks numéricamente
chunks = chunks.sort((a, b) => {
const aNum = parseInt(a, 10)
const bNum = parseInt(b, 10)
return aNum - bNum
})
for (const chunk of chunks) {
const chunkPath = path.join(chunksPath, chunk)
if (!fs.existsSync(chunkPath)) {
return reject(
new OperationError(500, "No chunk data found"),
)
}
const data = await fs.promises.readFile(chunkPath)
fileSize += data.length
if (fileSize > maxFileSize) {
return reject(
new OperationError(
413,
"File exceeds max total file size, aborting assembly...",
),
)
}
await fs.promises.appendFile(filePath, data)
}
return resolve({
chunksLength: chunks.length,
filePath: filePath,
})
})
}
export async function handleChunkFile(
fileStream,
{ tmpDir, headers, maxFileSize, maxChunkSize },
) {
return await new Promise(async (resolve, reject) => {
const workPath = path.join(tmpDir, headers["uploader-file-id"])
const chunksPath = path.join(workPath, "chunks")
const chunkPath = path.join(
chunksPath,
headers["uploader-chunk-number"],
)
const chunkCount = +headers["uploader-chunk-number"]
const totalChunks = +headers["uploader-chunks-total"]
// check if file has all chunks uploaded
const isLast = chunkCount === totalChunks - 1
// make sure chunk is in range
if (chunkCount < 0 || chunkCount >= totalChunks) {
return reject(new OperationError(500, "Chunk is out of range"))
}
// if is the first chunk check if dir exists before write things
if (chunkCount === 0) {
try {
if (!(await fs.promises.stat(chunksPath).catch(() => false))) {
await fs.promises.mkdir(chunksPath, { recursive: true })
}
} catch (error) {
return reject(new OperationError(500, error.message))
}
}
let dataWritten = 0
let writeStream = fs.createWriteStream(chunkPath)
writeStream.on("error", (err) => {
reject(err)
})
writeStream.on("close", () => {
if (maxChunkSize !== undefined) {
if (dataWritten > maxChunkSize) {
reject(
new OperationError(
413,
"Chunk size exceeds max chunk size, aborting upload...",
),
)
return
}
// estimate total file size,
// if estimation exceeds maxFileSize, abort upload
if (chunkCount === 0 && totalChunks > 0) {
if (dataWritten * (totalChunks - 1) > maxFileSize) {
reject(
new OperationError(
413,
"File estimated size exceeds max total file size, aborting upload...",
),
)
return
}
}
}
if (isLast) {
const mimetype = mimetypes.lookup(
headers["uploader-original-name"],
)
const extension = mimetypes.extension(mimetype)
let filename = headers["uploader-file-id"]
if (headers["uploader-use-date"] === "true") {
filename = `${filename}_${Date.now()}`
}
return resolve(
createAssembleChunksPromise({
// build data
chunksPath: chunksPath,
filePath: path.resolve(
workPath,
`${filename}.${extension}`,
),
maxFileSize: maxFileSize,
}),
)
}
return resolve(null)
})
fileStream.on("data", (buffer) => {
dataWritten += buffer.byteLength
})
fileStream.pipe(writeStream)
})
}

View File

@ -1,15 +1,30 @@
import path from "path" import { Duplex } from "node:stream"
import fs from "fs" import path from "node:path"
import fs from "node:fs"
import RemoteUpload from "@services/remoteUpload" import RemoteUpload from "@services/remoteUpload"
import {
import ChunkFileUpload from "@shared-classes/ChunkFileUpload" checkChunkUploadHeaders,
handleChunkFile,
} from "@classes/ChunkFileUpload"
const availableProviders = ["b2", "standard"] const availableProviders = ["b2", "standard"]
function bufferToStream(bf) {
let tmp = new Duplex()
tmp.push(bf)
tmp.push(null)
return tmp
}
export default { export default {
useContext: ["cache", "limits"], useContext: ["cache", "limits"],
middlewares: ["withAuthentication"], middlewares: ["withAuthentication"],
fn: async (req, res) => { fn: async (req, res) => {
if (!checkChunkUploadHeaders(req.headers)) {
reject(new OperationError(400, "Missing header(s)"))
return
}
const uploadId = `${req.headers["uploader-file-id"]}_${Date.now()}` const uploadId = `${req.headers["uploader-file-id"]}_${Date.now()}`
const tmpPath = path.resolve( const tmpPath = path.resolve(
@ -47,23 +62,26 @@ export default {
throw new OperationError(400, "Invalid provider") throw new OperationError(400, "Invalid provider")
} }
let build = await ChunkFileUpload(req, { // create a readable stream from req.body(buffer)
const dataStream = bufferToStream(await req.buffer())
let result = await handleChunkFile(dataStream, {
tmpDir: tmpPath, tmpDir: tmpPath,
...limits, headers: req.headers,
}).catch((err) => { maxFileSize: limits.maxFileSize,
throw new OperationError(err.code, err.message) maxChunkSize: limits.maxChunkSize,
}) })
if (typeof build === "function") { if (typeof result === "function") {
try { try {
build = await build() result = await result()
if (req.headers["transmux"] || limits.useCompression === true) { if (req.headers["transmux"] || limits.useCompression === true) {
// add a background task // add a background task
const job = await global.queues.createJob( const job = await global.queues.createJob(
"remote_upload", "remote_upload",
{ {
filePath: build.filePath, filePath: result.filePath,
parentDir: req.auth.session.user_id, parentDir: req.auth.session.user_id,
service: limits.useProvider, service: limits.useProvider,
useCompression: limits.useCompression, useCompression: limits.useCompression,
@ -81,11 +99,11 @@ export default {
return { return {
uploadId: uploadId, uploadId: uploadId,
sseChannelId: sseChannelId, sseChannelId: sseChannelId,
eventChannelURL: `${req.protocol}://${req.get("host")}/upload/sse_events/${sseChannelId}`, eventChannelURL: `${req.headers["x-forwarded-proto"] || req.protocol}://${req.get("host")}/upload/sse_events/${sseChannelId}`,
} }
} else { } else {
const result = await RemoteUpload({ const result = await RemoteUpload({
source: build.filePath, source: result.filePath,
parentDir: req.auth.session.user_id, parentDir: req.auth.session.user_id,
service: limits.useProvider, service: limits.useProvider,
useCompression: limits.useCompression, useCompression: limits.useCompression,

View File

@ -1,5 +1,5 @@
export default async (req, res) => { export default async (req, res) => {
const { sse_channel_id } = req.params const { sse_channel_id } = req.params
global.sse.connectToChannelStream(sse_channel_id, req, res) global.sse.connectToChannelStream(sse_channel_id, req, res)
} }