Refactor file upload system with new transformations pipeline

The commit refactors the chunked upload system to support a transformation pipeline. Key changes include:

- Replace SSE field names for consistency (sseChannelId, sseUrl)
- Fix progress reporting structure with state and percent fields
- Add transformation handlers (a-dash, mq-hls, img-compress, video-compress)
- Create new Upload class with clear separation of concerns
- Improve file processing workflow with better directory structure
- Fix typo in UploadButton component (progess → progress)
- Remove deprecated file processing services
This commit is contained in:
SrGooglo 2025-04-24 06:06:21 +00:00
parent 369803534b
commit f62e885c65
32 changed files with 630 additions and 1188 deletions

View File

@ -170,7 +170,7 @@ export default class ChunkedUpload {
// check if is the last chunk, if so, handle sse events
if (this.chunkCount === this.totalChunks) {
if (data.sseChannelId || data.eventChannelURL) {
if (data.sseChannelId || data.sseUrl) {
this.waitOnSSE(data)
} else {
this.events.emit("finish", data)
@ -178,9 +178,8 @@ export default class ChunkedUpload {
}
this.events.emit("progress", {
percentProgress: Math.round(
(100 / this.totalChunks) * this.chunkCount,
),
percent: Math.round((100 / this.totalChunks) * this.chunkCount),
state: "Uploading",
})
} catch (error) {
this.events.emit("error", error)
@ -196,12 +195,9 @@ export default class ChunkedUpload {
}
waitOnSSE(data) {
console.log(
`[UPLOADER] Connecting to SSE channel >`,
data.eventChannelURL,
)
console.log(`[UPLOADER] Connecting to SSE channel >`, data.sseUrl)
const eventSource = new EventSource(data.eventChannelURL)
const eventSource = new EventSource(data.sseUrl)
eventSource.onerror = (error) => {
this.events.emit("error", error)
@ -218,19 +214,20 @@ export default class ChunkedUpload {
console.log(`[UPLOADER] SSE Event >`, messageData)
if (messageData.status === "done") {
if (messageData.event === "done") {
this.events.emit("finish", messageData.result)
eventSource.close()
}
if (messageData.status === "error") {
if (messageData.event === "error") {
this.events.emit("error", messageData.result)
eventSource.close()
}
if (messageData.status === "progress") {
if (messageData.state) {
this.events.emit("progress", {
percentProgress: messageData.progress,
percent: messageData.percent,
state: messageData.state,
})
}
}

View File

@ -7,112 +7,102 @@ import { Icons } from "@components/Icons"
import "./index.less"
export default (props) => {
const [uploading, setUploading] = React.useState(false)
const [progess, setProgess] = React.useState(null)
const [uploading, setUploading] = React.useState(false)
const [progress, setProgress] = React.useState(null)
const handleOnStart = (file_uid, file) => {
if (typeof props.onStart === "function") {
props.onStart(file_uid, file)
}
}
const handleOnStart = (file_uid, file) => {
if (typeof props.onStart === "function") {
props.onStart(file_uid, file)
}
}
const handleOnProgress = (file_uid, progress) => {
if (typeof props.onProgress === "function") {
props.onProgress(file_uid, progress)
}
}
const handleOnProgress = (file_uid, progress) => {
if (typeof props.onProgress === "function") {
props.onProgress(file_uid, progress)
}
}
const handleOnError = (file_uid, error) => {
if (typeof props.onError === "function") {
props.onError(file_uid, error)
}
}
const handleOnError = (file_uid, error) => {
if (typeof props.onError === "function") {
props.onError(file_uid, error)
}
}
const handleOnSuccess = (file_uid, response) => {
if (typeof props.onSuccess === "function") {
props.onSuccess(file_uid, response)
}
}
const handleOnSuccess = (file_uid, response) => {
if (typeof props.onSuccess === "function") {
props.onSuccess(file_uid, response)
}
}
const handleUpload = async (req) => {
setUploading(true)
setProgess(1)
const handleUpload = async (req) => {
setUploading(true)
setProgress(1)
handleOnStart(req.file.uid, req.file)
handleOnStart(req.file.uid, req.file)
await app.cores.remoteStorage.uploadFile(req.file, {
headers: props.headers,
onProgress: (file, progress) => {
setProgess(progress)
handleOnProgress(file.uid, progress)
},
onError: (file, error) => {
setProgess(null)
handleOnError(file.uid, error)
setUploading(false)
},
onFinish: (file, response) => {
if (typeof props.ctx?.onUpdateItem === "function") {
props.ctx.onUpdateItem(response.url)
}
await app.cores.remoteStorage.uploadFile(req.file, {
headers: props.headers,
onProgress: (file, progress) => {
setProgress(progress)
handleOnProgress(file.uid, progress)
},
onError: (file, error) => {
setProgress(null)
handleOnError(file.uid, error)
setUploading(false)
},
onFinish: (file, response) => {
if (typeof props.ctx?.onUpdateItem === "function") {
props.ctx.onUpdateItem(response.url)
}
if (typeof props.onUploadDone === "function") {
props.onUploadDone(response)
}
if (typeof props.onUploadDone === "function") {
props.onUploadDone(response)
}
setUploading(false)
handleOnSuccess(req.file.uid, response)
setUploading(false)
handleOnSuccess(req.file.uid, response)
setTimeout(() => {
setProgess(null)
}, 1000)
},
})
}
setTimeout(() => {
setProgress(null)
}, 1000)
},
})
}
return <Upload
customRequest={handleUpload}
multiple={
props.multiple ?? false
}
accept={
props.accept ?? [
"image/*",
"video/*",
"audio/*",
]
}
progress={false}
fileList={[]}
className={classnames(
"uploadButton",
{
["uploading"]: !!progess || uploading
}
)}
disabled={uploading}
>
<div className="uploadButton-content">
{
!progess && (props.icon ?? <Icons.FiUpload
style={{
margin: 0
}}
/>)
}
return (
<Upload
customRequest={handleUpload}
multiple={props.multiple ?? false}
accept={props.accept ?? ["image/*", "video/*", "audio/*"]}
progress={false}
fileList={[]}
className={classnames("uploadButton", {
["uploading"]: !!progress || uploading,
})}
disabled={uploading}
>
<div className="uploadButton-content">
{!progress &&
(props.icon ?? (
<Icons.FiUpload
style={{
margin: 0,
}}
/>
))}
{
progess && <Progress
type="circle"
percent={progess}
strokeWidth={20}
format={() => null}
/>
}
{progress && (
<Progress
type="circle"
percent={progress?.percent ?? 0}
strokeWidth={20}
format={() => null}
/>
)}
{
props.children ?? "Upload"
}
</div>
</Upload>
}
{props.children ?? "Upload"}
</div>
</Upload>
)
}

View File

@ -84,9 +84,9 @@ export default class RemoteStorage extends Core {
_reject(message)
})
uploader.events.on("progress", ({ percentProgress }) => {
uploader.events.on("progress", (data) => {
if (typeof onProgress === "function") {
onProgress(file, percentProgress)
onProgress(file, data)
}
})

View File

@ -107,8 +107,7 @@ export async function handleChunkFile(
{ tmpDir, headers, maxFileSize, maxChunkSize },
) {
return await new Promise(async (resolve, reject) => {
const workPath = path.join(tmpDir, headers["uploader-file-id"])
const chunksPath = path.join(workPath, "chunks")
const chunksPath = path.join(tmpDir, "chunks")
const chunkPath = path.join(
chunksPath,
headers["uploader-chunk-number"],
@ -188,7 +187,7 @@ export async function handleChunkFile(
// build data
chunksPath: chunksPath,
filePath: path.resolve(
workPath,
tmpDir,
`${filename}.${extension}`,
),
maxFileSize: maxFileSize,
@ -207,38 +206,4 @@ export async function handleChunkFile(
})
}
export async function uploadChunkFile(
req,
{ tmpDir, maxFileSize, maxChunkSize },
) {
// create a readable stream from req.body data blob
//
const chunkData = new Blob([req.body], { type: "application/octet-stream" })
console.log(chunkData)
if (!checkChunkUploadHeaders(req.headers)) {
reject(new OperationError(400, "Missing header(s)"))
return
}
// return await new Promise(async (resolve, reject) => {
// // create a readable node stream from "req.body" (octet-stream)
// await req.multipart(async (field) => {
// try {
// const result = await handleChunkFile(field.file.stream, {
// tmpDir: tmpDir,
// headers: req.headers,
// maxFileSize: maxFileSize,
// maxChunkSize: maxChunkSize,
// })
// return resolve(result)
// } catch (error) {
// return reject(error)
// }
// })
// })
}
export default uploadChunkFile

View File

@ -56,14 +56,12 @@ export default class TaskQueueManager {
registerQueueEvents = (worker) => {
worker.on("progress", (job, progress) => {
try {
console.log(`Job ${job.id} reported progress: ${progress}%`)
console.log(
`Job ${job.id} reported progress: ${progress.percent}%`,
)
if (job.data.sseChannelId) {
global.sse.sendToChannel(job.data.sseChannelId, {
status: "progress",
events: "job_progress",
progress,
})
global.sse.sendToChannel(job.data.sseChannelId, progress)
}
} catch (error) {
// manejar error
@ -76,8 +74,9 @@ export default class TaskQueueManager {
if (job.data.sseChannelId) {
global.sse.sendToChannel(job.data.sseChannelId, {
status: "done",
result,
event: "done",
state: "done",
result: result,
})
}
} catch (error) {}
@ -89,7 +88,8 @@ export default class TaskQueueManager {
if (job.data.sseChannelId) {
global.sse.sendToChannel(job.data.sseChannelId, {
status: "error",
event: "error",
state: "error",
result: error.message,
})
}
@ -122,9 +122,9 @@ export default class TaskQueueManager {
)
await global.sse.sendToChannel(sseChannelId, {
status: "progress",
events: "job_queued",
progress: 5,
event: "job_queued",
state: "progress",
percent: 5,
})
}

View File

@ -104,11 +104,9 @@ export function createAssembleChunksPromise({
export async function handleChunkFile(
fileStream,
{ tmpDir, headers, maxFileSize, maxChunkSize },
{ chunksPath, outputDir, headers, maxFileSize, maxChunkSize },
) {
return await new Promise(async (resolve, reject) => {
const workPath = path.join(tmpDir, headers["uploader-file-id"])
const chunksPath = path.join(workPath, "chunks")
const chunkPath = path.join(
chunksPath,
headers["uploader-chunk-number"],
@ -125,17 +123,6 @@ export async function handleChunkFile(
return reject(new OperationError(500, "Chunk is out of range"))
}
// if is the first chunk check if dir exists before write things
if (chunkCount === 0) {
try {
if (!(await fs.promises.stat(chunksPath).catch(() => false))) {
await fs.promises.mkdir(chunksPath, { recursive: true })
}
} catch (error) {
return reject(new OperationError(500, error.message))
}
}
let dataWritten = 0
let writeStream = fs.createWriteStream(chunkPath)
@ -172,25 +159,18 @@ export async function handleChunkFile(
}
if (isLast) {
const mimetype = mimetypes.lookup(
headers["uploader-original-name"],
)
const extension = mimetypes.extension(mimetype)
// const mimetype = mimetypes.lookup(
// headers["uploader-original-name"],
// )
// const extension = mimetypes.extension(mimetype)
let filename = headers["uploader-file-id"]
if (headers["uploader-use-date"] === "true") {
filename = `${filename}_${Date.now()}`
}
let filename = nanoid()
return resolve(
createAssembleChunksPromise({
// build data
chunksPath: chunksPath,
filePath: path.resolve(
workPath,
`${filename}.${extension}`,
),
filePath: path.resolve(outputDir, filename),
maxFileSize: maxFileSize,
}),
)

View File

@ -0,0 +1,38 @@
import path from "node:path"
import SegmentedAudioMPDJob from "@shared-classes/SegmentedAudioMPDJob"
export default async ({ filePath, workPath, onProgress }) => {
return new Promise(async (resolve, reject) => {
const outputDir = path.resolve(workPath, "a-dash")
const job = new SegmentedAudioMPDJob({
input: filePath,
outputDir: outputDir,
// set to default as raw flac
audioCodec: "flac",
audioBitrate: "default",
audioSampleRate: "default",
})
job.on("start", () => {
console.log("A-DASH started")
})
job.on("end", (data) => {
console.log("A-DASH completed", data)
resolve(data)
})
job.on("progress", (progress) => {
if (typeof onProgress === "function") {
onProgress({
percent: progress,
state: "transmuxing",
})
}
})
job.run()
})
}

View File

@ -0,0 +1,43 @@
import fs from "node:fs"
import path from "node:path"
import Sharp from "sharp"
const imageProcessingConf = {
sizeThreshold: 10 * 1024 * 1024,
imageQuality: 80,
}
const imageTypeToConfig = {
png: {
compressionLevel: Math.floor(imageProcessingConf.imageQuality / 100),
},
default: {
quality: imageProcessingConf.imageQuality,
},
}
export default async ({ filePath, workPath }) => {
const stat = await fs.promises.stat(file.filepath)
// ignore if too small
if (stat.size < imageProcessingConf.sizeThreshold) {
return file
}
let image = await Sharp(filePath)
const { format } = await image.metadata()
image = await image[format](
imageTypeToConfig[format] ?? imageTypeToConfig.default,
).withMetadata()
filePath = path.resolve(workPath, `${path.basename(filePath)}_ff`)
await image.toFile(outputFilepath)
return {
filePath: filePath,
}
}

View File

@ -0,0 +1,50 @@
import path from "node:path"
import MultiqualityHLSJob from "@shared-classes/MultiqualityHLSJob"
export default async ({ filePath, workPath, onProgress }) => {
return new Promise(async (resolve, reject) => {
const outputDir = path.resolve(workPath, "mqhls")
const job = new MultiqualityHLSJob({
input: filePath,
outputDir: outputDir,
// set default
outputMasterName: "master.m3u8",
levels: [
{
original: true,
codec: "libx264",
bitrate: "10M",
preset: "ultrafast",
},
{
codec: "libx264",
width: 1280,
bitrate: "3M",
preset: "ultrafast",
},
],
})
job.on("start", () => {
console.log("A-DASH started")
})
job.on("end", (data) => {
console.log("A-DASH completed", data)
resolve(data)
})
job.on("progress", (progress) => {
if (typeof onProgress === "function") {
onProgress({
percent: progress,
state: "transmuxing",
})
}
})
job.run()
})
}

View File

@ -0,0 +1,6 @@
export default async ({ filePath, workPath }) => {
// TODO: Implement video compression logic
return {
filePath: filePath,
}
}

View File

@ -0,0 +1,27 @@
const Handlers = {
"a-dash": require("./handlers/a-dash").default,
"mq-hls": require("./handlers/mq-hls").default,
"img-compress": require("./handlers/img-compress").default,
"video-compress": require("./handlers/video-compress").default,
}
export type TransformationPayloadType = {
filePath: string
workPath: string
handler: string
onProgress?: function
}
class Transformation {
static async transform(payload: TransformationPayloadType) {
const handler = Handlers[payload.handler]
if (typeof handler !== "function") {
throw new Error(`Invalid handler: ${payload.handler}`)
}
return await handler(payload)
}
}
export default Transformation

View File

@ -0,0 +1,107 @@
import fs from "node:fs"
import path from "node:path"
import mimeTypes from "mime-types"
import {fileTypeFromBuffer} from 'file-type'
import readChunk from "@utils/readChunk"
import getFileHash from "@shared-utils/readFileHash"
import putObject from "./putObject"
import Transformation, { TransformationPayloadType } from "../Transformation"
export type FileHandlePayload = {
user_id: string
filePath: string
workPath: string
uploadId?: string
transformations?: Array<string>
s3Provider?: string
onProgress?: Function
}
export type S3UploadPayload = {
filePath: string
basePath: string
targePath?: string
}
export default class Upload {
static fileHandle = async (payload: FileHandlePayload) => {
// process
const processed = await Upload.process(payload)
// overwrite filePath
payload.filePath = processed.filePath
// upload
const result = await Upload.toS3({
filePath: payload.filePath,
targetPath: payload.targetPath,
basePath: payload.user_id,
})
// delete workpath
await fs.promises.rm(payload.workPath, { recursive: true, force: true })
return result
}
static process = async (payload: FileHandlePayload) => {
if (Array.isArray(payload.transformations)) {
for await (const transformation: TransformationPayloadType of payload.transformations) {
const transformationResult = await Transformation.transform({
filePath: payload.filePath,
workPath: payload.workPath,
onProgress: payload.onProgress,
handler: transformation,
})
// if is a file, overwrite filePath
if (transformationResult.outputFile) {
payload.filePath = transformationResult.outputFile
}
// if is a directory, overwrite filePath to upload entire directory
if (transformationResult.outputPath) {
payload.filePath = transformationResult.outputPath
payload.targetPath = transformationResult.outputFile
payload.isDirectory = true
}
}
}
return payload
}
static toS3 = async (payload: S3UploadPayload, onProgress?: Function) => {
const { filePath, basePath, targetPath } = payload
const firstBuffer = await readChunk(targetPath ?? filePath, { length: 4100 })
const fileHash = await getFileHash(fs.createReadStream(targetPath ?? filePath))
const fileType = await fileTypeFromBuffer(firstBuffer)
const uploadPath = path.join(basePath, path.basename(filePath))
const metadata = {
"File-Hash": fileHash,
"Content-Type": fileType.mime,
}
if (typeof onProgress === "function") {
onProgress({
percent: 0,
state: "uploading_s3",
})
}
const result = await putObject({
filePath: filePath,
uploadPath: uploadPath,
metadata: metadata,
targetFilename: targetPath ? path.basename(targetPath) : null,
})
return result
}
}

View File

@ -0,0 +1,58 @@
import fs from "node:fs"
import path from "node:path"
import pMap from "p-map"
export default async function standardUpload({
filePath,
uploadPath,
metadata = {},
targetFilename,
onFinish,
}) {
const isDirectory = await fs.promises
.lstat(filePath)
.then((stats) => stats.isDirectory())
if (isDirectory) {
let files = await fs.promises.readdir(filePath)
files = files.map((file) => {
const newPath = path.join(filePath, file)
return {
filePath: newPath,
uploadPath: path.join(uploadPath, file),
}
})
await pMap(files, standardUpload, {
concurrency: 3,
})
return {
id: uploadPath,
url: global.storage.composeRemoteURL(uploadPath, targetFilename),
metadata: metadata,
}
}
// upload to storage
await global.storage.fPutObject(
process.env.S3_BUCKET,
uploadPath,
filePath,
metadata,
)
const result = {
id: uploadPath,
url: global.storage.composeRemoteURL(uploadPath),
metadata: metadata,
}
if (typeof onFinish === "function") {
await onFinish(result)
}
return result
}

View File

@ -3,6 +3,7 @@ import { Server } from "linebridge"
import B2 from "backblaze-b2"
import DbManager from "@shared-classes/DbManager"
import RedisClient from "@shared-classes/RedisClient"
import StorageClient from "@shared-classes/StorageClient"
import CacheService from "@shared-classes/CacheService"
import SSEManager from "@shared-classes/SSEManager"
@ -10,9 +11,21 @@ import SharedMiddlewares from "@shared-middlewares"
import LimitsClass from "@shared-classes/Limits"
import TaskQueueManager from "@shared-classes/TaskQueueManager"
// import * as Minio from 'minio'
// class StorageNG {
// constructor() {
// }
// async initialize() {
// }
// }
class API extends Server {
static refName = "files"
static useEngine = "hyper-express"
static useEngine = "hyper-express-ng"
static routesPath = `${__dirname}/routes`
static listen_port = process.env.HTTP_LISTEN_PORT ?? 3002
static enableWebsockets = true
@ -27,6 +40,9 @@ class API extends Server {
storage: StorageClient(),
b2Storage: null,
SSEManager: new SSEManager(),
redis: RedisClient({
maxRetriesPerRequest: null,
}),
limits: {},
}
@ -55,8 +71,9 @@ class API extends Server {
)
}
await this.contexts.redis.initialize()
await this.queuesManager.initialize({
redisOptions: this.engine.ws.redis.options,
redisOptions: this.contexts.redis.client,
})
await this.contexts.db.initialize()
await this.contexts.storage.initialize()

View File

@ -1,20 +1,21 @@
{
"name": "files",
"version": "0.60.2",
"dependencies": {
"backblaze-b2": "^1.7.0",
"busboy": "^1.6.0",
"content-range": "^2.0.2",
"ffmpeg-static": "^5.2.0",
"fluent-ffmpeg": "^2.1.2",
"merge-files": "^0.1.2",
"mime-types": "^2.1.35",
"minio": "^7.0.32",
"normalize-url": "^8.0.0",
"p-map": "4",
"p-queue": "^7.3.4",
"redis": "^4.6.6",
"sharp": "0.32.6",
"split-chunk-merge": "^1.0.0"
}
"name": "files",
"version": "0.60.2",
"dependencies": {
"backblaze-b2": "^1.7.0",
"busboy": "^1.6.0",
"content-range": "^2.0.2",
"ffmpeg-static": "^5.2.0",
"file-type": "^20.4.1",
"fluent-ffmpeg": "^2.1.2",
"merge-files": "^0.1.2",
"mime-types": "^2.1.35",
"minio": "^7.0.32",
"normalize-url": "^8.0.0",
"p-map": "4",
"p-queue": "^7.3.4",
"redis": "^4.6.6",
"sharp": "0.32.6",
"split-chunk-merge": "^1.0.0"
}
}

View File

@ -1,49 +0,0 @@
import path from "node:path"
import fs from "node:fs"
import RemoteUpload from "@services/remoteUpload"
export default {
id: "remote_upload",
maxJobs: 10,
process: async (job) => {
const {
filePath,
parentDir,
service,
useCompression,
cachePath,
transmux,
transmuxOptions,
} = job.data
console.log("[JOB][remote_upload] Processing job >", job.data)
try {
const result = await RemoteUpload({
parentDir: parentDir,
source: filePath,
service: service,
useCompression: useCompression,
transmux: transmux,
transmuxOptions: transmuxOptions,
cachePath: cachePath,
onProgress: (progress) => {
job.updateProgress(progress)
},
})
await fs.promises
.rm(filePath, { recursive: true, force: true })
.catch(() => null)
return result
} catch (error) {
await fs.promises
.rm(filePath, { recursive: true, force: true })
.catch(() => null)
throw error
}
},
}

View File

@ -1,43 +0,0 @@
import RemoteUpload from "@services/remoteUpload"
import fs from "node:fs"
module.exports = async (job) => {
const {
filePath,
parentDir,
service,
useCompression,
cachePath,
transmux,
transmuxOptions,
} = job.data
console.log("[JOB][remote_upload] Processing job >", job.data)
try {
const result = await RemoteUpload({
parentDir: parentDir,
source: filePath,
service: service,
useCompression: useCompression,
transmux: transmux,
transmuxOptions: transmuxOptions,
cachePath: cachePath,
onProgress: (progress) => {
job.progress(progress)
},
})
await fs.promises
.rm(filePath, { recursive: true, force: true })
.catch(() => null)
return result
} catch (error) {
await fs.promises
.rm(filePath, { recursive: true, force: true })
.catch(() => null)
throw error
}
}

View File

@ -0,0 +1,29 @@
import path from "node:path"
import fs from "node:fs"
import Upload from "@classes/Upload"
export default {
id: "file-process",
maxJobs: 2,
process: async (job) => {
console.log("[JOB][file-process] starting... >", job.data)
try {
const result = await Upload.fileHandle({
...job.data,
onProgress: (progress) => {
job.updateProgress(progress)
},
})
return result
} catch (error) {
await fs.promises
.rm(tmpPath, { recursive: true, force: true })
.catch(() => null)
throw error
}
},
}

View File

@ -1,88 +0,0 @@
import path from "node:path"
import fs from "node:fs"
import axios from "axios"
import MultiqualityHLSJob from "@shared-classes/MultiqualityHLSJob"
import { standardUpload } from "@services/remoteUpload"
export default {
useContext: ["cache", "limits"],
middlewares: ["withAuthentication"],
fn: async (req, res) => {
const { url } = req.query
const userPath = path.join(this.default.contexts.cache.constructor.cachePath, req.auth.session.user_id)
const jobId = String(new Date().getTime())
const jobPath = path.resolve(userPath, "jobs", jobId)
const sourcePath = path.resolve(jobPath, `${jobId}.source`)
if (!fs.existsSync(jobPath)) {
fs.mkdirSync(jobPath, { recursive: true })
}
const sourceStream = fs.createWriteStream(sourcePath)
const response = await axios({
method: "get",
url,
responseType: "stream",
})
response.data.pipe(sourceStream)
await new Promise((resolve, reject) => {
sourceStream.on("finish", () => {
resolve()
})
sourceStream.on("error", (err) => {
reject(err)
})
})
const job = new MultiqualityHLSJob({
input: sourcePath,
outputDir: jobPath,
levels: [
{
original: true,
codec: "libx264",
bitrate: "10M",
preset: "ultrafast",
},
{
codec: "libx264",
width: 1280,
bitrate: "3M",
preset: "ultrafast",
}
]
})
await new Promise((resolve, reject) => {
job
.on("error", (err) => {
console.error(`[TRANSMUX] Transmuxing failed`, err)
reject(err)
})
.on("end", () => {
console.debug(`[TRANSMUX] Finished transmuxing > ${sourcePath}`)
resolve()
})
.run()
})
const result = await standardUpload({
isDirectory: true,
source: path.join(jobPath, "hls"),
remotePath: `${req.auth.session.user_id}/jobs/${jobId}`,
})
fs.rmSync(jobPath, { recursive: true, force: true })
return {
result
}
}
}

View File

@ -1,21 +1,12 @@
import { Duplex } from "node:stream"
import path from "node:path"
import fs from "node:fs"
import RemoteUpload from "@services/remoteUpload"
import {
checkChunkUploadHeaders,
handleChunkFile,
} from "@classes/ChunkFileUpload"
import { checkChunkUploadHeaders, handleChunkFile } from "@classes/ChunkFile"
import Upload from "@classes/Upload"
import bufferToStream from "@utils/bufferToStream"
const availableProviders = ["b2", "standard"]
function bufferToStream(bf) {
let tmp = new Duplex()
tmp.push(bf)
tmp.push(null)
return tmp
}
export default {
useContext: ["cache", "limits"],
middlewares: ["withAuthentication"],
@ -25,14 +16,16 @@ export default {
return
}
const uploadId = `${req.headers["uploader-file-id"]}_${Date.now()}`
const uploadId = `${req.headers["uploader-file-id"]}`
const tmpPath = path.resolve(
const workPath = path.resolve(
this.default.contexts.cache.constructor.cachePath,
req.auth.session.user_id,
`${req.auth.session.user_id}-${uploadId}`,
)
const chunksPath = path.join(workPath, "chunks")
const assembledPath = path.join(workPath, "assembled")
const limits = {
const config = {
maxFileSize:
parseInt(this.default.contexts.limits.maxFileSizeInMB) *
1024 *
@ -46,88 +39,82 @@ export default {
}
// const user = await req.auth.user()
// if (user.roles.includes("admin")) {
// // maxFileSize for admins 100GB
// limits.maxFileSize = 100 * 1024 * 1024 * 1024
// // optional compression for admins
// limits.useCompression = req.headers["use-compression"] ?? false
// limits.useProvider = req.headers["provider-type"] ?? "b2"
// }
// check if provider is valid
if (!availableProviders.includes(limits.useProvider)) {
if (!availableProviders.includes(config.useProvider)) {
throw new OperationError(400, "Invalid provider")
}
// create a readable stream from req.body(buffer)
await fs.promises.mkdir(workPath, { recursive: true })
await fs.promises.mkdir(chunksPath, { recursive: true })
await fs.promises.mkdir(assembledPath, { recursive: true })
// create a readable stream
const dataStream = bufferToStream(await req.buffer())
let result = await handleChunkFile(dataStream, {
tmpDir: tmpPath,
let assemble = await handleChunkFile(dataStream, {
chunksPath: chunksPath,
outputDir: assembledPath,
headers: req.headers,
maxFileSize: limits.maxFileSize,
maxChunkSize: limits.maxChunkSize,
maxFileSize: config.maxFileSize,
maxChunkSize: config.maxChunkSize,
})
if (typeof result === "function") {
try {
result = await result()
const useJob = true
if (req.headers["transmux"] || limits.useCompression === true) {
// add a background task
if (typeof assemble === "function") {
try {
assemble = await assemble()
let transformations = req.headers["transformations"]
if (transformations) {
transformations = transformations
.split(",")
.map((t) => t.trim())
}
const payload = {
user_id: req.auth.session.user_id,
uploadId: uploadId,
filePath: assemble.filePath,
workPath: workPath,
transformations: transformations,
}
// if has transformations, use background job
if (transformations && transformations.length > 0) {
const job = await global.queues.createJob(
"remote_upload",
{
filePath: result.filePath,
parentDir: req.auth.session.user_id,
service: limits.useProvider,
useCompression: limits.useCompression,
transmux: req.headers["transmux"] ?? false,
transmuxOptions: req.headers["transmux-options"],
cachePath: tmpPath,
},
"file-process",
payload,
{
useSSE: true,
},
)
const sseChannelId = job.sseChannelId
return {
uploadId: uploadId,
sseChannelId: sseChannelId,
eventChannelURL: `${req.headers["x-forwarded-proto"] || req.protocol}://${req.get("host")}/upload/sse_events/${sseChannelId}`,
uploadId: payload.uploadId,
sseChannelId: job.sseChannelId,
sseUrl: `${req.headers["x-forwarded-proto"] || req.protocol}://${req.get("host")}/upload/sse_events/${job.sseChannelId}`,
}
} else {
const result = await RemoteUpload({
source: result.filePath,
parentDir: req.auth.session.user_id,
service: limits.useProvider,
useCompression: limits.useCompression,
cachePath: tmpPath,
})
return result
}
} catch (error) {
await fs.promises
.rm(tmpPath, { recursive: true, force: true })
.catch(() => {
return false
})
throw new OperationError(
error.code ?? 500,
error.message ?? "Failed to upload file",
)
return await Upload.fileHandle(payload)
} catch (error) {
await fs.promises.rm(workPath, { recursive: true })
throw error
}
}
return {
ok: 1,
next: true,
chunkNumber: req.headers["uploader-chunk-number"],
}
},

View File

@ -1,48 +1,44 @@
import path from "node:path"
import fs from "node:fs"
import RemoteUpload from "@services/remoteUpload"
import Upload from "@classes/Upload"
export default {
useContext: ["cache"],
middlewares: [
"withAuthentication",
],
fn: async (req, res) => {
const { cache } = this.default.contexts
useContext: ["cache"],
middlewares: ["withAuthentication"],
fn: async (req, res) => {
const workPath = path.resolve(
this.default.contexts.cache.constructor.cachePath,
`${req.auth.session.user_id}-${nanoid()}`,
)
const providerType = req.headers["provider-type"] ?? "standard"
await fs.promises.mkdir(workPath, { recursive: true })
const userPath = path.join(cache.constructor.cachePath, req.auth.session.user_id)
let localFilepath = null
let localFilepath = null
let tmpPath = path.resolve(userPath, `${Date.now()}`)
await req.multipart(async (field) => {
if (!field.file) {
throw new OperationError(400, "Missing file")
}
await req.multipart(async (field) => {
if (!field.file) {
throw new OperationError(400, "Missing file")
}
localFilepath = path.join(workPath, "file")
localFilepath = path.join(tmpPath, field.file.name)
await field.write(localFilepath)
})
const existTmpDir = await fs.promises.stat(tmpPath).then(() => true).catch(() => false)
let transformations = req.headers["transformations"]
if (!existTmpDir) {
await fs.promises.mkdir(tmpPath, { recursive: true })
}
if (transformations) {
transformations = transformations.split(",").map((t) => t.trim())
}
await field.write(localFilepath)
})
const result = await Upload.fileHandle({
user_id: req.auth.session.user_id,
filePath: localFilepath,
workPath: workPath,
transformations: transformations,
})
const result = await RemoteUpload({
parentDir: req.auth.session.user_id,
source: localFilepath,
service: providerType,
useCompression: ToBoolean(req.headers["use-compression"]) ?? true,
})
fs.promises.rm(tmpPath, { recursive: true, force: true })
return result
}
return result
},
}

View File

@ -1,30 +0,0 @@
const ffmpeg = require("fluent-ffmpeg")
export default async (file) => {
// analize metadata
let metadata = await new Promise((resolve, reject) => {
ffmpeg.ffprobe(file.filepath, (err, data) => {
if (err) {
return reject(err)
}
resolve(data)
})
}).catch((err) => {
console.error(err)
return {}
})
if (metadata.format) {
metadata = metadata.format
}
file.metadata = {
duration: metadata.duration,
bitrate: metadata.bit_rate,
size: metadata.size,
}
return file
}

View File

@ -1,59 +0,0 @@
import fs from "node:fs"
import path from "node:path"
import Sharp from "sharp"
const imageProcessingConf = {
// TODO: Get image sizeThreshold from DB
sizeThreshold: 10 * 1024 * 1024,
// TODO: Get image quality from DB
imageQuality: 80,
}
const imageTypeToConfig = {
png: {
compressionLevel: Math.floor(imageProcessingConf.imageQuality / 100),
},
default: {
quality: imageProcessingConf.imageQuality
}
}
/**
* Processes an image file and transforms it if it's above a certain size threshold.
*
* @async
* @function
* @param {Object} file - The file to be processed.
* @param {string} file.filepath - The path of the file to be processed.
* @param {string} file.hash - The hash of the file to be processed.
* @param {string} file.cachePath - The cache path of the file to be processed.
* @throws {Error} If the file parameter is not provided.
* @return {Object} The processed file object.
*/
async function processImage(file) {
if (!file) {
throw new Error("file is required")
}
const stat = await fs.promises.stat(file.filepath)
if (stat.size < imageProcessingConf.sizeThreshold) {
return file
}
let image = await Sharp(file.filepath)
const { format } = await image.metadata()
image = await image[format](imageTypeToConfig[format] ?? imageTypeToConfig.default).withMetadata()
const outputFilepath = path.resolve(file.cachePath, `${file.hash}_transformed.${format}`)
await image.toFile(outputFilepath)
file.filepath = outputFilepath
return file
}
export default processImage

View File

@ -1,53 +0,0 @@
import fs from "node:fs"
import mimetypes from "mime-types"
import processVideo from "./video"
import processImage from "./image"
import processAudio from "./audio"
const fileTransformer = {
// video
"video/avi": processVideo,
"video/quicktime": processVideo,
"video/mp4": processVideo,
"video/webm": processVideo,
//image
"image/jpeg": processImage,
"image/png": processImage,
"image/gif": processImage,
"image/bmp": processImage,
"image/tiff": processImage,
"image/webp": processImage,
"image/jfif": processImage,
// audio
"audio/flac": processAudio,
"audio/x-flac": processAudio,
"audio/mp3": processAudio,
"audio/x-mp3": processAudio,
"audio/mpeg": processAudio,
"audio/x-mpeg": processAudio,
"audio/ogg": processAudio,
"audio/x-ogg": processAudio,
"audio/wav": processAudio,
"audio/x-wav": processAudio,
}
export default async (file) => {
if (!file) {
throw new Error("file is required")
}
if (!fs.existsSync(file.filepath)) {
throw new Error(`File ${file.filepath} not found`)
}
const fileMimetype = mimetypes.lookup(file.filepath)
if (typeof fileTransformer[fileMimetype] !== "function") {
console.debug(`File (${file.filepath}) has mimetype ${fileMimetype} and will not be processed`)
return file
}
return await fileTransformer[fileMimetype](file)
}

View File

@ -1,43 +0,0 @@
import videoTranscode from "@services/videoTranscode"
/**
* Processes a video file based on the specified options.
*
* @async
* @param {Object} file - The video file to process.
* @param {Object} [options={}] - The options object to use for processing.
* @param {string} [options.videoCodec="libx264"] - The video codec to use.
* @param {string} [options.format="mp4"] - The format to use.
* @param {number} [options.audioBitrate=128] - The audio bitrate to use.
* @param {number} [options.videoBitrate=2024] - The video bitrate to use.
* @throws {Error} Throws an error if file parameter is not provided.
* @return {Object} The processed video file object.
*/
async function processVideo(file, options = {}) {
if (!file) {
throw new Error("file is required")
}
// TODO: Get values from db
const {
videoCodec = "libx264",
format = "mp4",
audioBitrate = 128,
videoBitrate = 3000,
} = options
const result = await videoTranscode(file.filepath, {
videoCodec,
format,
audioBitrate,
videoBitrate: [videoBitrate, true],
extraOptions: ["-threads 2"],
})
file.filepath = result.filepath
file.filename = result.filename
return file
}
export default processVideo

View File

@ -1,162 +0,0 @@
import fs from "node:fs"
import path from "node:path"
import mimeTypes from "mime-types"
import getFileHash from "@shared-utils/readFileHash"
import PostProcess from "../post-process"
import Transmux from "../transmux"
import StandardUpload from "./providers/standard"
import B2Upload from "./providers/b2"
export default async ({
source,
parentDir,
service,
useCompression,
cachePath,
transmux,
transmuxOptions,
isDirectory,
onProgress,
}) => {
if (!source) {
throw new OperationError(500, "source is required")
}
if (!service) {
service = "standard"
}
if (!parentDir) {
parentDir = "/"
}
if (transmuxOptions) {
transmuxOptions = JSON.parse(transmuxOptions)
}
if (useCompression) {
if (typeof onProgress === "function") {
onProgress(10, {
event: "post_processing",
})
}
try {
const processOutput = await PostProcess({
filepath: source,
cachePath: cachePath,
})
if (processOutput) {
if (processOutput.filepath) {
source = processOutput.filepath
}
}
} catch (error) {
console.error(error)
throw new OperationError(500, `Failed to process file`)
}
}
if (transmux) {
if (typeof onProgress === "function") {
onProgress(30, {
event: "transmuxing",
})
}
try {
const processOutput = await Transmux({
transmuxer: transmux,
transmuxOptions: transmuxOptions,
filepath: source,
cachePath: cachePath,
})
if (processOutput) {
if (processOutput.filepath) {
source = processOutput.filepath
}
if (processOutput.isDirectory) {
isDirectory = true
}
}
} catch (error) {
console.error(error)
throw new OperationError(500, `Failed to transmux file`)
}
}
const type = mimeTypes.lookup(path.basename(source))
const hash = await getFileHash(fs.createReadStream(source))
let fileId = `${hash}`
// FIXME: This is a walkaround to avoid to hashing the entire directories
if (isDirectory) {
fileId = global.nanoid()
}
let remotePath = path.join(parentDir, fileId)
let result = {}
const metadata = {
"Content-Type": type,
"File-Hash": hash,
}
if (typeof onProgress === "function") {
onProgress(80, {
event: "uploading_s3",
service: service,
})
}
try {
switch (service) {
case "b2":
if (!global.b2Storage) {
throw new OperationError(
500,
"B2 storage not configured on environment, unsupported service. Please use `standard` service.",
)
}
result = await B2Upload({
source: isDirectory ? path.dirname(source) : source,
remotePath: remotePath,
metadata: metadata,
isDirectory: isDirectory,
targetFilename: isDirectory ? path.basename(source) : null,
})
break
case "standard":
result = await StandardUpload({
source: isDirectory ? path.dirname(source) : source,
remotePath: remotePath,
metadata: metadata,
isDirectory: isDirectory,
targetFilename: isDirectory ? path.basename(source) : null,
})
break
default:
throw new OperationError(500, "Unsupported service")
}
} catch (error) {
console.error(error)
throw new OperationError(500, "Failed to upload to storage")
}
if (typeof onProgress === "function") {
onProgress(100, {
event: "done",
result: result,
})
}
return result
}

View File

@ -1,90 +0,0 @@
import fs from "node:fs"
import path from "node:path"
import pMap from "p-map"
export default async function b2Upload({
source,
remotePath,
metadata = {},
targetFilename,
isDirectory,
retryNumber = 0
}) {
if (isDirectory) {
let files = await fs.promises.readdir(source)
files = files.map((file) => {
const filePath = path.join(source, file)
const isTargetDirectory = fs.lstatSync(filePath).isDirectory()
return {
source: filePath,
remotePath: path.join(remotePath, file),
isDirectory: isTargetDirectory,
}
})
await pMap(
files,
b2Upload,
{
concurrency: 5
}
)
return {
id: remotePath,
url: `https://${process.env.B2_CDN_ENDPOINT}/${process.env.B2_BUCKET}/${remotePath}/${targetFilename}`,
metadata: metadata,
}
}
try {
await global.b2Storage.authorize()
if (!fs.existsSync(source)) {
throw new OperationError(500, "File not found")
}
const uploadUrl = await global.b2Storage.getUploadUrl({
bucketId: process.env.B2_BUCKET_ID,
})
console.debug(`Uploading object to B2 Storage >`, {
source: source,
remote: remotePath,
})
const data = await fs.promises.readFile(source)
await global.b2Storage.uploadFile({
uploadUrl: uploadUrl.data.uploadUrl,
uploadAuthToken: uploadUrl.data.authorizationToken,
fileName: remotePath,
data: data,
info: metadata
})
} catch (error) {
console.error(error)
if (retryNumber < 5) {
return await b2Upload({
source,
remotePath,
metadata,
targetFilename,
isDirectory,
retryNumber: retryNumber + 1
})
}
throw new OperationError(500, "B2 upload failed")
}
return {
id: remotePath,
url: `https://${process.env.B2_CDN_ENDPOINT}/${process.env.B2_BUCKET}/${remotePath}`,
metadata: metadata,
}
}

View File

@ -1,58 +0,0 @@
import fs from "node:fs"
import path from "node:path"
import pMap from "p-map"
export default async function standardUpload({
source,
remotePath,
metadata = {},
targetFilename,
isDirectory,
}) {
if (isDirectory) {
let files = await fs.promises.readdir(source)
files = files.map((file) => {
const filePath = path.join(source, file)
const isTargetDirectory = fs.lstatSync(filePath).isDirectory()
return {
source: filePath,
remotePath: path.join(remotePath, file),
isDirectory: isTargetDirectory,
}
})
await pMap(
files,
standardUpload,
{
concurrency: 3
}
)
return {
id: remotePath,
url: global.storage.composeRemoteURL(remotePath, targetFilename),
metadata: metadata,
}
}
console.debug(`Uploading object to S3 Minio >`, {
source: source,
remote: remotePath,
})
// upload to storage
await global.storage.fPutObject(process.env.S3_BUCKET, remotePath, source, metadata)
// compose url
const url = global.storage.composeRemoteURL(remotePath)
return {
id: remotePath,
url: url,
metadata: metadata,
}
}

View File

@ -1,108 +0,0 @@
import fs from "node:fs"
import path from "node:path"
import MultiqualityHLSJob from "@shared-classes/MultiqualityHLSJob"
import SegmentedAudioMPDJob from "@shared-classes/SegmentedAudioMPDJob"
const transmuxers = [
{
id: "mq-hls",
container: "hls",
extension: "m3u8",
multipleOutput: true,
buildCommand: (input, outputDir) => {
return new MultiqualityHLSJob({
input: input,
outputDir: outputDir,
outputMasterName: "master.m3u8",
levels: [
{
original: true,
codec: "libx264",
bitrate: "10M",
preset: "ultrafast",
},
{
codec: "libx264",
width: 1280,
bitrate: "3M",
preset: "ultrafast",
}
]
})
},
},
{
id: "a-dash",
container: "dash",
extension: "mpd",
multipleOutput: true,
buildCommand: (input, outputDir) => {
return new SegmentedAudioMPDJob({
input: input,
outputDir: outputDir,
outputMasterName: "master.mpd",
audioCodec: "flac",
//audioBitrate: "1600k",
//audioSampleRate: 96000,
segmentTime: 10,
})
}
},
]
export default async (params) => {
if (!params) {
throw new Error("params is required")
}
if (!params.filepath) {
throw new Error("filepath is required")
}
if (!params.cachePath) {
throw new Error("cachePath is required")
}
if (!params.transmuxer) {
throw new Error("transmuxer is required")
}
if (!fs.existsSync(params.filepath)) {
throw new Error(`File ${params.filepath} not found`)
}
const transmuxer = transmuxers.find((item) => item.id === params.transmuxer)
if (!transmuxer) {
throw new Error(`Transmuxer ${params.transmuxer} not found`)
}
const jobPath = path.dirname(params.filepath)
if (!fs.existsSync(path.dirname(jobPath))) {
fs.mkdirSync(path.dirname(jobPath), { recursive: true })
}
return await new Promise((resolve, reject) => {
try {
const command = transmuxer.buildCommand(params.filepath, jobPath)
command
.on("progress", function (progress) {
console.log("Processing: " + progress.percent + "% done")
})
.on("error", (err) => {
reject(err)
})
.on("end", (data) => {
resolve(data)
})
.run()
} catch (error) {
console.error(`[TRANSMUX] Transmuxing failed`, error)
reject(error)
}
})
}

View File

@ -1,98 +0,0 @@
import path from "path"
const ffmpeg = require("fluent-ffmpeg")
const defaultParams = {
audioBitrate: 128,
videoBitrate: 1024,
videoCodec: "libvpx",
audioCodec: "libvorbis",
format: "mp4",
}
const maxTasks = 5
export default (input, params = defaultParams) => {
return new Promise((resolve, reject) => {
if (!global.ffmpegTasks) {
global.ffmpegTasks = []
}
if (global.ffmpegTasks.length >= maxTasks) {
return reject(new Error("Too many transcoding tasks"))
}
const outputFilename = `${path.basename(input).split(".")[0]}_ff.${params.format ?? "webm"}`
const outputFilepath = `${path.dirname(input)}/${outputFilename}`
console.debug(`[TRANSCODING] Transcoding ${input} to ${outputFilepath}`)
const onEnd = async () => {
console.debug(
`[TRANSCODING] Finished transcode ${input} to ${outputFilepath}`,
)
return resolve({
filename: outputFilename,
filepath: outputFilepath,
})
}
const onError = (err) => {
console.error(
`[TRANSCODING] Transcoding ${input} to ${outputFilepath} failed`,
err,
)
return reject(err)
}
let exec = null
const commands = {
input: input,
...params,
output: outputFilepath,
outputOptions: ["-preset veryfast"],
}
// chain methods
for (let key in commands) {
if (exec === null) {
exec = ffmpeg(commands[key])
continue
}
if (key === "extraOptions" && Array.isArray(commands[key])) {
for (const option of commands[key]) {
exec = exec.inputOptions(option)
}
continue
}
if (key === "outputOptions" && Array.isArray(commands[key])) {
for (const option of commands[key]) {
exec = exec.outputOptions(option)
}
continue
}
if (typeof exec[key] !== "function") {
console.warn(`[TRANSCODING] Method ${key} is not a function`)
return false
}
if (Array.isArray(commands[key])) {
exec = exec[key](...commands[key])
} else {
exec = exec[key](commands[key])
}
continue
}
exec.on("error", onError).on("end", onEnd).run()
})
}

View File

@ -0,0 +1,10 @@
import { Duplex } from "node:stream"
export default (bf) => {
let tmp = new Duplex()
tmp.push(bf)
tmp.push(null)
return tmp
}

View File

@ -0,0 +1,22 @@
// Original fork from https://github.com/sindresorhus/read-chunk
import { open } from "node:fs/promises"
export default async (filePath, { length, startPosition }) => {
const fileDescriptor = await open(filePath, "r")
try {
let { bytesRead, buffer } = await fileDescriptor.read({
buffer: new Uint8Array(length),
length,
position: startPosition,
})
if (bytesRead < length) {
buffer = buffer.subarray(0, bytesRead)
}
return buffer
} finally {
await fileDescriptor?.close()
}
}