diff --git a/packages/app/src/classes/ChunkedUpload/index.js b/packages/app/src/classes/ChunkedUpload/index.js index 6d6b3441..5e93c5ab 100644 --- a/packages/app/src/classes/ChunkedUpload/index.js +++ b/packages/app/src/classes/ChunkedUpload/index.js @@ -170,7 +170,7 @@ export default class ChunkedUpload { // check if is the last chunk, if so, handle sse events if (this.chunkCount === this.totalChunks) { - if (data.sseChannelId || data.eventChannelURL) { + if (data.sseChannelId || data.sseUrl) { this.waitOnSSE(data) } else { this.events.emit("finish", data) @@ -178,9 +178,8 @@ export default class ChunkedUpload { } this.events.emit("progress", { - percentProgress: Math.round( - (100 / this.totalChunks) * this.chunkCount, - ), + percent: Math.round((100 / this.totalChunks) * this.chunkCount), + state: "Uploading", }) } catch (error) { this.events.emit("error", error) @@ -196,12 +195,9 @@ export default class ChunkedUpload { } waitOnSSE(data) { - console.log( - `[UPLOADER] Connecting to SSE channel >`, - data.eventChannelURL, - ) + console.log(`[UPLOADER] Connecting to SSE channel >`, data.sseUrl) - const eventSource = new EventSource(data.eventChannelURL) + const eventSource = new EventSource(data.sseUrl) eventSource.onerror = (error) => { this.events.emit("error", error) @@ -218,19 +214,20 @@ export default class ChunkedUpload { console.log(`[UPLOADER] SSE Event >`, messageData) - if (messageData.status === "done") { + if (messageData.event === "done") { this.events.emit("finish", messageData.result) eventSource.close() } - if (messageData.status === "error") { + if (messageData.event === "error") { this.events.emit("error", messageData.result) eventSource.close() } - if (messageData.status === "progress") { + if (messageData.state) { this.events.emit("progress", { - percentProgress: messageData.progress, + percent: messageData.percent, + state: messageData.state, }) } } diff --git a/packages/app/src/components/UploadButton/index.jsx b/packages/app/src/components/UploadButton/index.jsx index 0a7caee1..90ca59e4 100755 --- a/packages/app/src/components/UploadButton/index.jsx +++ b/packages/app/src/components/UploadButton/index.jsx @@ -7,112 +7,102 @@ import { Icons } from "@components/Icons" import "./index.less" export default (props) => { - const [uploading, setUploading] = React.useState(false) - const [progess, setProgess] = React.useState(null) + const [uploading, setUploading] = React.useState(false) + const [progress, setProgress] = React.useState(null) - const handleOnStart = (file_uid, file) => { - if (typeof props.onStart === "function") { - props.onStart(file_uid, file) - } - } + const handleOnStart = (file_uid, file) => { + if (typeof props.onStart === "function") { + props.onStart(file_uid, file) + } + } - const handleOnProgress = (file_uid, progress) => { - if (typeof props.onProgress === "function") { - props.onProgress(file_uid, progress) - } - } + const handleOnProgress = (file_uid, progress) => { + if (typeof props.onProgress === "function") { + props.onProgress(file_uid, progress) + } + } - const handleOnError = (file_uid, error) => { - if (typeof props.onError === "function") { - props.onError(file_uid, error) - } - } + const handleOnError = (file_uid, error) => { + if (typeof props.onError === "function") { + props.onError(file_uid, error) + } + } - const handleOnSuccess = (file_uid, response) => { - if (typeof props.onSuccess === "function") { - props.onSuccess(file_uid, response) - } - } + const handleOnSuccess = (file_uid, response) => { + if (typeof props.onSuccess === "function") { + props.onSuccess(file_uid, response) + } + } - const handleUpload = async (req) => { - setUploading(true) - setProgess(1) + const handleUpload = async (req) => { + setUploading(true) + setProgress(1) - handleOnStart(req.file.uid, req.file) + handleOnStart(req.file.uid, req.file) - await app.cores.remoteStorage.uploadFile(req.file, { - headers: props.headers, - onProgress: (file, progress) => { - setProgess(progress) - handleOnProgress(file.uid, progress) - }, - onError: (file, error) => { - setProgess(null) - handleOnError(file.uid, error) - setUploading(false) - }, - onFinish: (file, response) => { - if (typeof props.ctx?.onUpdateItem === "function") { - props.ctx.onUpdateItem(response.url) - } + await app.cores.remoteStorage.uploadFile(req.file, { + headers: props.headers, + onProgress: (file, progress) => { + setProgress(progress) + handleOnProgress(file.uid, progress) + }, + onError: (file, error) => { + setProgress(null) + handleOnError(file.uid, error) + setUploading(false) + }, + onFinish: (file, response) => { + if (typeof props.ctx?.onUpdateItem === "function") { + props.ctx.onUpdateItem(response.url) + } - if (typeof props.onUploadDone === "function") { - props.onUploadDone(response) - } + if (typeof props.onUploadDone === "function") { + props.onUploadDone(response) + } - setUploading(false) - handleOnSuccess(req.file.uid, response) + setUploading(false) + handleOnSuccess(req.file.uid, response) - setTimeout(() => { - setProgess(null) - }, 1000) - }, - }) - } + setTimeout(() => { + setProgress(null) + }, 1000) + }, + }) + } - return -
- { - !progess && (props.icon ?? ) - } + return ( + +
+ {!progress && + (props.icon ?? ( + + ))} - { - progess && null} - /> - } + {progress && ( + null} + /> + )} - { - props.children ?? "Upload" - } -
-
-} \ No newline at end of file + {props.children ?? "Upload"} +
+
+ ) +} diff --git a/packages/app/src/cores/remoteStorage/remoteStorage.core.js b/packages/app/src/cores/remoteStorage/remoteStorage.core.js index 91bd8da0..3cca026c 100755 --- a/packages/app/src/cores/remoteStorage/remoteStorage.core.js +++ b/packages/app/src/cores/remoteStorage/remoteStorage.core.js @@ -84,9 +84,9 @@ export default class RemoteStorage extends Core { _reject(message) }) - uploader.events.on("progress", ({ percentProgress }) => { + uploader.events.on("progress", (data) => { if (typeof onProgress === "function") { - onProgress(file, percentProgress) + onProgress(file, data) } }) diff --git a/packages/server/classes/ChunkFileUpload/index.js b/packages/server/classes/ChunkFileUpload/index.js index 98ea5a1e..e648a11c 100755 --- a/packages/server/classes/ChunkFileUpload/index.js +++ b/packages/server/classes/ChunkFileUpload/index.js @@ -107,8 +107,7 @@ export async function handleChunkFile( { tmpDir, headers, maxFileSize, maxChunkSize }, ) { return await new Promise(async (resolve, reject) => { - const workPath = path.join(tmpDir, headers["uploader-file-id"]) - const chunksPath = path.join(workPath, "chunks") + const chunksPath = path.join(tmpDir, "chunks") const chunkPath = path.join( chunksPath, headers["uploader-chunk-number"], @@ -188,7 +187,7 @@ export async function handleChunkFile( // build data chunksPath: chunksPath, filePath: path.resolve( - workPath, + tmpDir, `${filename}.${extension}`, ), maxFileSize: maxFileSize, @@ -207,38 +206,4 @@ export async function handleChunkFile( }) } -export async function uploadChunkFile( - req, - { tmpDir, maxFileSize, maxChunkSize }, -) { - // create a readable stream from req.body data blob - // - const chunkData = new Blob([req.body], { type: "application/octet-stream" }) - - console.log(chunkData) - - if (!checkChunkUploadHeaders(req.headers)) { - reject(new OperationError(400, "Missing header(s)")) - return - } - - // return await new Promise(async (resolve, reject) => { - // // create a readable node stream from "req.body" (octet-stream) - // await req.multipart(async (field) => { - // try { - // const result = await handleChunkFile(field.file.stream, { - // tmpDir: tmpDir, - // headers: req.headers, - // maxFileSize: maxFileSize, - // maxChunkSize: maxChunkSize, - // }) - - // return resolve(result) - // } catch (error) { - // return reject(error) - // } - // }) - // }) -} - export default uploadChunkFile diff --git a/packages/server/classes/TaskQueueManager/index.js b/packages/server/classes/TaskQueueManager/index.js index 0fbf8a69..24ebd9c4 100644 --- a/packages/server/classes/TaskQueueManager/index.js +++ b/packages/server/classes/TaskQueueManager/index.js @@ -56,14 +56,12 @@ export default class TaskQueueManager { registerQueueEvents = (worker) => { worker.on("progress", (job, progress) => { try { - console.log(`Job ${job.id} reported progress: ${progress}%`) + console.log( + `Job ${job.id} reported progress: ${progress.percent}%`, + ) if (job.data.sseChannelId) { - global.sse.sendToChannel(job.data.sseChannelId, { - status: "progress", - events: "job_progress", - progress, - }) + global.sse.sendToChannel(job.data.sseChannelId, progress) } } catch (error) { // manejar error @@ -76,8 +74,9 @@ export default class TaskQueueManager { if (job.data.sseChannelId) { global.sse.sendToChannel(job.data.sseChannelId, { - status: "done", - result, + event: "done", + state: "done", + result: result, }) } } catch (error) {} @@ -89,7 +88,8 @@ export default class TaskQueueManager { if (job.data.sseChannelId) { global.sse.sendToChannel(job.data.sseChannelId, { - status: "error", + event: "error", + state: "error", result: error.message, }) } @@ -122,9 +122,9 @@ export default class TaskQueueManager { ) await global.sse.sendToChannel(sseChannelId, { - status: "progress", - events: "job_queued", - progress: 5, + event: "job_queued", + state: "progress", + percent: 5, }) } diff --git a/packages/server/services/files/classes/ChunkFileUpload/index.js b/packages/server/services/files/classes/ChunkFile/index.js similarity index 81% rename from packages/server/services/files/classes/ChunkFileUpload/index.js rename to packages/server/services/files/classes/ChunkFile/index.js index 8c677970..b480016d 100755 --- a/packages/server/services/files/classes/ChunkFileUpload/index.js +++ b/packages/server/services/files/classes/ChunkFile/index.js @@ -104,11 +104,9 @@ export function createAssembleChunksPromise({ export async function handleChunkFile( fileStream, - { tmpDir, headers, maxFileSize, maxChunkSize }, + { chunksPath, outputDir, headers, maxFileSize, maxChunkSize }, ) { return await new Promise(async (resolve, reject) => { - const workPath = path.join(tmpDir, headers["uploader-file-id"]) - const chunksPath = path.join(workPath, "chunks") const chunkPath = path.join( chunksPath, headers["uploader-chunk-number"], @@ -125,17 +123,6 @@ export async function handleChunkFile( return reject(new OperationError(500, "Chunk is out of range")) } - // if is the first chunk check if dir exists before write things - if (chunkCount === 0) { - try { - if (!(await fs.promises.stat(chunksPath).catch(() => false))) { - await fs.promises.mkdir(chunksPath, { recursive: true }) - } - } catch (error) { - return reject(new OperationError(500, error.message)) - } - } - let dataWritten = 0 let writeStream = fs.createWriteStream(chunkPath) @@ -172,25 +159,18 @@ export async function handleChunkFile( } if (isLast) { - const mimetype = mimetypes.lookup( - headers["uploader-original-name"], - ) - const extension = mimetypes.extension(mimetype) + // const mimetype = mimetypes.lookup( + // headers["uploader-original-name"], + // ) + // const extension = mimetypes.extension(mimetype) - let filename = headers["uploader-file-id"] - - if (headers["uploader-use-date"] === "true") { - filename = `${filename}_${Date.now()}` - } + let filename = nanoid() return resolve( createAssembleChunksPromise({ // build data chunksPath: chunksPath, - filePath: path.resolve( - workPath, - `${filename}.${extension}`, - ), + filePath: path.resolve(outputDir, filename), maxFileSize: maxFileSize, }), ) diff --git a/packages/server/services/files/classes/Transformation/handlers/a-dash.js b/packages/server/services/files/classes/Transformation/handlers/a-dash.js new file mode 100644 index 00000000..d2ce5fb8 --- /dev/null +++ b/packages/server/services/files/classes/Transformation/handlers/a-dash.js @@ -0,0 +1,38 @@ +import path from "node:path" +import SegmentedAudioMPDJob from "@shared-classes/SegmentedAudioMPDJob" + +export default async ({ filePath, workPath, onProgress }) => { + return new Promise(async (resolve, reject) => { + const outputDir = path.resolve(workPath, "a-dash") + + const job = new SegmentedAudioMPDJob({ + input: filePath, + outputDir: outputDir, + + // set to default as raw flac + audioCodec: "flac", + audioBitrate: "default", + audioSampleRate: "default", + }) + + job.on("start", () => { + console.log("A-DASH started") + }) + + job.on("end", (data) => { + console.log("A-DASH completed", data) + resolve(data) + }) + + job.on("progress", (progress) => { + if (typeof onProgress === "function") { + onProgress({ + percent: progress, + state: "transmuxing", + }) + } + }) + + job.run() + }) +} diff --git a/packages/server/services/files/classes/Transformation/handlers/img-compress.js b/packages/server/services/files/classes/Transformation/handlers/img-compress.js new file mode 100644 index 00000000..eb1c68fa --- /dev/null +++ b/packages/server/services/files/classes/Transformation/handlers/img-compress.js @@ -0,0 +1,43 @@ +import fs from "node:fs" +import path from "node:path" + +import Sharp from "sharp" + +const imageProcessingConf = { + sizeThreshold: 10 * 1024 * 1024, + imageQuality: 80, +} + +const imageTypeToConfig = { + png: { + compressionLevel: Math.floor(imageProcessingConf.imageQuality / 100), + }, + default: { + quality: imageProcessingConf.imageQuality, + }, +} + +export default async ({ filePath, workPath }) => { + const stat = await fs.promises.stat(file.filepath) + + // ignore if too small + if (stat.size < imageProcessingConf.sizeThreshold) { + return file + } + + let image = await Sharp(filePath) + + const { format } = await image.metadata() + + image = await image[format]( + imageTypeToConfig[format] ?? imageTypeToConfig.default, + ).withMetadata() + + filePath = path.resolve(workPath, `${path.basename(filePath)}_ff`) + + await image.toFile(outputFilepath) + + return { + filePath: filePath, + } +} diff --git a/packages/server/services/files/classes/Transformation/handlers/mq-hls.js b/packages/server/services/files/classes/Transformation/handlers/mq-hls.js new file mode 100644 index 00000000..f6ce093f --- /dev/null +++ b/packages/server/services/files/classes/Transformation/handlers/mq-hls.js @@ -0,0 +1,50 @@ +import path from "node:path" +import MultiqualityHLSJob from "@shared-classes/MultiqualityHLSJob" + +export default async ({ filePath, workPath, onProgress }) => { + return new Promise(async (resolve, reject) => { + const outputDir = path.resolve(workPath, "mqhls") + + const job = new MultiqualityHLSJob({ + input: filePath, + outputDir: outputDir, + + // set default + outputMasterName: "master.m3u8", + levels: [ + { + original: true, + codec: "libx264", + bitrate: "10M", + preset: "ultrafast", + }, + { + codec: "libx264", + width: 1280, + bitrate: "3M", + preset: "ultrafast", + }, + ], + }) + + job.on("start", () => { + console.log("A-DASH started") + }) + + job.on("end", (data) => { + console.log("A-DASH completed", data) + resolve(data) + }) + + job.on("progress", (progress) => { + if (typeof onProgress === "function") { + onProgress({ + percent: progress, + state: "transmuxing", + }) + } + }) + + job.run() + }) +} diff --git a/packages/server/services/files/classes/Transformation/handlers/video-compress.js b/packages/server/services/files/classes/Transformation/handlers/video-compress.js new file mode 100644 index 00000000..c95e28c6 --- /dev/null +++ b/packages/server/services/files/classes/Transformation/handlers/video-compress.js @@ -0,0 +1,6 @@ +export default async ({ filePath, workPath }) => { + // TODO: Implement video compression logic + return { + filePath: filePath, + } +} diff --git a/packages/server/services/files/classes/Transformation/index.ts b/packages/server/services/files/classes/Transformation/index.ts new file mode 100644 index 00000000..18087fd0 --- /dev/null +++ b/packages/server/services/files/classes/Transformation/index.ts @@ -0,0 +1,27 @@ +const Handlers = { + "a-dash": require("./handlers/a-dash").default, + "mq-hls": require("./handlers/mq-hls").default, + "img-compress": require("./handlers/img-compress").default, + "video-compress": require("./handlers/video-compress").default, +} + +export type TransformationPayloadType = { + filePath: string + workPath: string + handler: string + onProgress?: function +} + +class Transformation { + static async transform(payload: TransformationPayloadType) { + const handler = Handlers[payload.handler] + + if (typeof handler !== "function") { + throw new Error(`Invalid handler: ${payload.handler}`) + } + + return await handler(payload) + } +} + +export default Transformation diff --git a/packages/server/services/files/classes/Upload/index.ts b/packages/server/services/files/classes/Upload/index.ts new file mode 100644 index 00000000..c8bf62b1 --- /dev/null +++ b/packages/server/services/files/classes/Upload/index.ts @@ -0,0 +1,107 @@ +import fs from "node:fs" +import path from "node:path" + +import mimeTypes from "mime-types" +import {fileTypeFromBuffer} from 'file-type' +import readChunk from "@utils/readChunk" + +import getFileHash from "@shared-utils/readFileHash" +import putObject from "./putObject" + +import Transformation, { TransformationPayloadType } from "../Transformation" + +export type FileHandlePayload = { + user_id: string + filePath: string + workPath: string + uploadId?: string + transformations?: Array + s3Provider?: string + onProgress?: Function +} + +export type S3UploadPayload = { + filePath: string + basePath: string + targePath?: string +} + +export default class Upload { + static fileHandle = async (payload: FileHandlePayload) => { + // process + const processed = await Upload.process(payload) + + // overwrite filePath + payload.filePath = processed.filePath + + // upload + const result = await Upload.toS3({ + filePath: payload.filePath, + targetPath: payload.targetPath, + basePath: payload.user_id, + }) + + // delete workpath + await fs.promises.rm(payload.workPath, { recursive: true, force: true }) + + return result + } + + static process = async (payload: FileHandlePayload) => { + if (Array.isArray(payload.transformations)) { + for await (const transformation: TransformationPayloadType of payload.transformations) { + const transformationResult = await Transformation.transform({ + filePath: payload.filePath, + workPath: payload.workPath, + onProgress: payload.onProgress, + handler: transformation, + }) + + // if is a file, overwrite filePath + if (transformationResult.outputFile) { + payload.filePath = transformationResult.outputFile + } + + // if is a directory, overwrite filePath to upload entire directory + if (transformationResult.outputPath) { + payload.filePath = transformationResult.outputPath + payload.targetPath = transformationResult.outputFile + payload.isDirectory = true + } + } + } + + return payload + } + + static toS3 = async (payload: S3UploadPayload, onProgress?: Function) => { + const { filePath, basePath, targetPath } = payload + + const firstBuffer = await readChunk(targetPath ?? filePath, { length: 4100 }) + const fileHash = await getFileHash(fs.createReadStream(targetPath ?? filePath)) + const fileType = await fileTypeFromBuffer(firstBuffer) + + const uploadPath = path.join(basePath, path.basename(filePath)) + + const metadata = { + "File-Hash": fileHash, + "Content-Type": fileType.mime, + } + + if (typeof onProgress === "function") { + onProgress({ + percent: 0, + state: "uploading_s3", + }) + } + + const result = await putObject({ + filePath: filePath, + uploadPath: uploadPath, + metadata: metadata, + targetFilename: targetPath ? path.basename(targetPath) : null, + }) + + return result + } +} diff --git a/packages/server/services/files/classes/Upload/putObject.js b/packages/server/services/files/classes/Upload/putObject.js new file mode 100644 index 00000000..89f0d51a --- /dev/null +++ b/packages/server/services/files/classes/Upload/putObject.js @@ -0,0 +1,58 @@ +import fs from "node:fs" +import path from "node:path" +import pMap from "p-map" + +export default async function standardUpload({ + filePath, + uploadPath, + metadata = {}, + targetFilename, + onFinish, +}) { + const isDirectory = await fs.promises + .lstat(filePath) + .then((stats) => stats.isDirectory()) + + if (isDirectory) { + let files = await fs.promises.readdir(filePath) + + files = files.map((file) => { + const newPath = path.join(filePath, file) + + return { + filePath: newPath, + uploadPath: path.join(uploadPath, file), + } + }) + + await pMap(files, standardUpload, { + concurrency: 3, + }) + + return { + id: uploadPath, + url: global.storage.composeRemoteURL(uploadPath, targetFilename), + metadata: metadata, + } + } + + // upload to storage + await global.storage.fPutObject( + process.env.S3_BUCKET, + uploadPath, + filePath, + metadata, + ) + + const result = { + id: uploadPath, + url: global.storage.composeRemoteURL(uploadPath), + metadata: metadata, + } + + if (typeof onFinish === "function") { + await onFinish(result) + } + + return result +} diff --git a/packages/server/services/files/file.service.js b/packages/server/services/files/file.service.js index 0e0db8b2..598a155a 100755 --- a/packages/server/services/files/file.service.js +++ b/packages/server/services/files/file.service.js @@ -3,6 +3,7 @@ import { Server } from "linebridge" import B2 from "backblaze-b2" import DbManager from "@shared-classes/DbManager" +import RedisClient from "@shared-classes/RedisClient" import StorageClient from "@shared-classes/StorageClient" import CacheService from "@shared-classes/CacheService" import SSEManager from "@shared-classes/SSEManager" @@ -10,9 +11,21 @@ import SharedMiddlewares from "@shared-middlewares" import LimitsClass from "@shared-classes/Limits" import TaskQueueManager from "@shared-classes/TaskQueueManager" +// import * as Minio from 'minio' + +// class StorageNG { +// constructor() { + +// } + +// async initialize() { + +// } +// } + class API extends Server { static refName = "files" - static useEngine = "hyper-express" + static useEngine = "hyper-express-ng" static routesPath = `${__dirname}/routes` static listen_port = process.env.HTTP_LISTEN_PORT ?? 3002 static enableWebsockets = true @@ -27,6 +40,9 @@ class API extends Server { storage: StorageClient(), b2Storage: null, SSEManager: new SSEManager(), + redis: RedisClient({ + maxRetriesPerRequest: null, + }), limits: {}, } @@ -55,8 +71,9 @@ class API extends Server { ) } + await this.contexts.redis.initialize() await this.queuesManager.initialize({ - redisOptions: this.engine.ws.redis.options, + redisOptions: this.contexts.redis.client, }) await this.contexts.db.initialize() await this.contexts.storage.initialize() diff --git a/packages/server/services/files/package.json b/packages/server/services/files/package.json index a2a06202..a592a496 100755 --- a/packages/server/services/files/package.json +++ b/packages/server/services/files/package.json @@ -1,20 +1,21 @@ { - "name": "files", - "version": "0.60.2", - "dependencies": { - "backblaze-b2": "^1.7.0", - "busboy": "^1.6.0", - "content-range": "^2.0.2", - "ffmpeg-static": "^5.2.0", - "fluent-ffmpeg": "^2.1.2", - "merge-files": "^0.1.2", - "mime-types": "^2.1.35", - "minio": "^7.0.32", - "normalize-url": "^8.0.0", - "p-map": "4", - "p-queue": "^7.3.4", - "redis": "^4.6.6", - "sharp": "0.32.6", - "split-chunk-merge": "^1.0.0" - } + "name": "files", + "version": "0.60.2", + "dependencies": { + "backblaze-b2": "^1.7.0", + "busboy": "^1.6.0", + "content-range": "^2.0.2", + "ffmpeg-static": "^5.2.0", + "file-type": "^20.4.1", + "fluent-ffmpeg": "^2.1.2", + "merge-files": "^0.1.2", + "mime-types": "^2.1.35", + "minio": "^7.0.32", + "normalize-url": "^8.0.0", + "p-map": "4", + "p-queue": "^7.3.4", + "redis": "^4.6.6", + "sharp": "0.32.6", + "split-chunk-merge": "^1.0.0" + } } diff --git a/packages/server/services/files/queues/TransmuxedUpload/index.js b/packages/server/services/files/queues/TransmuxedUpload/index.js deleted file mode 100644 index 18b2d029..00000000 --- a/packages/server/services/files/queues/TransmuxedUpload/index.js +++ /dev/null @@ -1,49 +0,0 @@ -import path from "node:path" - -import fs from "node:fs" -import RemoteUpload from "@services/remoteUpload" - -export default { - id: "remote_upload", - maxJobs: 10, - process: async (job) => { - const { - filePath, - parentDir, - service, - useCompression, - cachePath, - transmux, - transmuxOptions, - } = job.data - - console.log("[JOB][remote_upload] Processing job >", job.data) - - try { - const result = await RemoteUpload({ - parentDir: parentDir, - source: filePath, - service: service, - useCompression: useCompression, - transmux: transmux, - transmuxOptions: transmuxOptions, - cachePath: cachePath, - onProgress: (progress) => { - job.updateProgress(progress) - }, - }) - - await fs.promises - .rm(filePath, { recursive: true, force: true }) - .catch(() => null) - - return result - } catch (error) { - await fs.promises - .rm(filePath, { recursive: true, force: true }) - .catch(() => null) - - throw error - } - }, -} diff --git a/packages/server/services/files/queues/TransmuxedUpload/worker.js b/packages/server/services/files/queues/TransmuxedUpload/worker.js deleted file mode 100644 index 39bf8728..00000000 --- a/packages/server/services/files/queues/TransmuxedUpload/worker.js +++ /dev/null @@ -1,43 +0,0 @@ -import RemoteUpload from "@services/remoteUpload" -import fs from "node:fs" - -module.exports = async (job) => { - const { - filePath, - parentDir, - service, - useCompression, - cachePath, - transmux, - transmuxOptions, - } = job.data - - console.log("[JOB][remote_upload] Processing job >", job.data) - - try { - const result = await RemoteUpload({ - parentDir: parentDir, - source: filePath, - service: service, - useCompression: useCompression, - transmux: transmux, - transmuxOptions: transmuxOptions, - cachePath: cachePath, - onProgress: (progress) => { - job.progress(progress) - }, - }) - - await fs.promises - .rm(filePath, { recursive: true, force: true }) - .catch(() => null) - - return result - } catch (error) { - await fs.promises - .rm(filePath, { recursive: true, force: true }) - .catch(() => null) - - throw error - } -} diff --git a/packages/server/services/files/queues/fileProcess/index.js b/packages/server/services/files/queues/fileProcess/index.js new file mode 100644 index 00000000..890720cc --- /dev/null +++ b/packages/server/services/files/queues/fileProcess/index.js @@ -0,0 +1,29 @@ +import path from "node:path" +import fs from "node:fs" + +import Upload from "@classes/Upload" + +export default { + id: "file-process", + maxJobs: 2, + process: async (job) => { + console.log("[JOB][file-process] starting... >", job.data) + + try { + const result = await Upload.fileHandle({ + ...job.data, + onProgress: (progress) => { + job.updateProgress(progress) + }, + }) + + return result + } catch (error) { + await fs.promises + .rm(tmpPath, { recursive: true, force: true }) + .catch(() => null) + + throw error + } + }, +} diff --git a/packages/server/services/files/routes/transcode/get.js b/packages/server/services/files/routes/transcode/get.js deleted file mode 100644 index b358bf41..00000000 --- a/packages/server/services/files/routes/transcode/get.js +++ /dev/null @@ -1,88 +0,0 @@ -import path from "node:path" -import fs from "node:fs" -import axios from "axios" - -import MultiqualityHLSJob from "@shared-classes/MultiqualityHLSJob" -import { standardUpload } from "@services/remoteUpload" - -export default { - useContext: ["cache", "limits"], - middlewares: ["withAuthentication"], - fn: async (req, res) => { - const { url } = req.query - - const userPath = path.join(this.default.contexts.cache.constructor.cachePath, req.auth.session.user_id) - - const jobId = String(new Date().getTime()) - const jobPath = path.resolve(userPath, "jobs", jobId) - - const sourcePath = path.resolve(jobPath, `${jobId}.source`) - - if (!fs.existsSync(jobPath)) { - fs.mkdirSync(jobPath, { recursive: true }) - } - - const sourceStream = fs.createWriteStream(sourcePath) - - const response = await axios({ - method: "get", - url, - responseType: "stream", - }) - - response.data.pipe(sourceStream) - - await new Promise((resolve, reject) => { - sourceStream.on("finish", () => { - resolve() - }) - sourceStream.on("error", (err) => { - reject(err) - }) - }) - - const job = new MultiqualityHLSJob({ - input: sourcePath, - outputDir: jobPath, - levels: [ - { - original: true, - codec: "libx264", - bitrate: "10M", - preset: "ultrafast", - }, - { - codec: "libx264", - width: 1280, - bitrate: "3M", - preset: "ultrafast", - } - ] - }) - - await new Promise((resolve, reject) => { - job - .on("error", (err) => { - console.error(`[TRANSMUX] Transmuxing failed`, err) - reject(err) - }) - .on("end", () => { - console.debug(`[TRANSMUX] Finished transmuxing > ${sourcePath}`) - resolve() - }) - .run() - }) - - const result = await standardUpload({ - isDirectory: true, - source: path.join(jobPath, "hls"), - remotePath: `${req.auth.session.user_id}/jobs/${jobId}`, - }) - - fs.rmSync(jobPath, { recursive: true, force: true }) - - return { - result - } - } -} \ No newline at end of file diff --git a/packages/server/services/files/routes/upload/chunk/post.js b/packages/server/services/files/routes/upload/chunk/post.js index 63ff90fd..5ae95310 100644 --- a/packages/server/services/files/routes/upload/chunk/post.js +++ b/packages/server/services/files/routes/upload/chunk/post.js @@ -1,21 +1,12 @@ -import { Duplex } from "node:stream" import path from "node:path" import fs from "node:fs" -import RemoteUpload from "@services/remoteUpload" -import { - checkChunkUploadHeaders, - handleChunkFile, -} from "@classes/ChunkFileUpload" + +import { checkChunkUploadHeaders, handleChunkFile } from "@classes/ChunkFile" +import Upload from "@classes/Upload" +import bufferToStream from "@utils/bufferToStream" const availableProviders = ["b2", "standard"] -function bufferToStream(bf) { - let tmp = new Duplex() - tmp.push(bf) - tmp.push(null) - return tmp -} - export default { useContext: ["cache", "limits"], middlewares: ["withAuthentication"], @@ -25,14 +16,16 @@ export default { return } - const uploadId = `${req.headers["uploader-file-id"]}_${Date.now()}` + const uploadId = `${req.headers["uploader-file-id"]}` - const tmpPath = path.resolve( + const workPath = path.resolve( this.default.contexts.cache.constructor.cachePath, - req.auth.session.user_id, + `${req.auth.session.user_id}-${uploadId}`, ) + const chunksPath = path.join(workPath, "chunks") + const assembledPath = path.join(workPath, "assembled") - const limits = { + const config = { maxFileSize: parseInt(this.default.contexts.limits.maxFileSizeInMB) * 1024 * @@ -46,88 +39,82 @@ export default { } // const user = await req.auth.user() - // if (user.roles.includes("admin")) { // // maxFileSize for admins 100GB // limits.maxFileSize = 100 * 1024 * 1024 * 1024 - // // optional compression for admins // limits.useCompression = req.headers["use-compression"] ?? false - // limits.useProvider = req.headers["provider-type"] ?? "b2" // } // check if provider is valid - if (!availableProviders.includes(limits.useProvider)) { + if (!availableProviders.includes(config.useProvider)) { throw new OperationError(400, "Invalid provider") } - // create a readable stream from req.body(buffer) + await fs.promises.mkdir(workPath, { recursive: true }) + await fs.promises.mkdir(chunksPath, { recursive: true }) + await fs.promises.mkdir(assembledPath, { recursive: true }) + + // create a readable stream const dataStream = bufferToStream(await req.buffer()) - let result = await handleChunkFile(dataStream, { - tmpDir: tmpPath, + let assemble = await handleChunkFile(dataStream, { + chunksPath: chunksPath, + outputDir: assembledPath, headers: req.headers, - maxFileSize: limits.maxFileSize, - maxChunkSize: limits.maxChunkSize, + maxFileSize: config.maxFileSize, + maxChunkSize: config.maxChunkSize, }) - if (typeof result === "function") { - try { - result = await result() + const useJob = true - if (req.headers["transmux"] || limits.useCompression === true) { - // add a background task + if (typeof assemble === "function") { + try { + assemble = await assemble() + + let transformations = req.headers["transformations"] + + if (transformations) { + transformations = transformations + .split(",") + .map((t) => t.trim()) + } + + const payload = { + user_id: req.auth.session.user_id, + uploadId: uploadId, + filePath: assemble.filePath, + workPath: workPath, + transformations: transformations, + } + + // if has transformations, use background job + if (transformations && transformations.length > 0) { const job = await global.queues.createJob( - "remote_upload", - { - filePath: result.filePath, - parentDir: req.auth.session.user_id, - service: limits.useProvider, - useCompression: limits.useCompression, - transmux: req.headers["transmux"] ?? false, - transmuxOptions: req.headers["transmux-options"], - cachePath: tmpPath, - }, + "file-process", + payload, { useSSE: true, }, ) - const sseChannelId = job.sseChannelId - return { - uploadId: uploadId, - sseChannelId: sseChannelId, - eventChannelURL: `${req.headers["x-forwarded-proto"] || req.protocol}://${req.get("host")}/upload/sse_events/${sseChannelId}`, + uploadId: payload.uploadId, + sseChannelId: job.sseChannelId, + sseUrl: `${req.headers["x-forwarded-proto"] || req.protocol}://${req.get("host")}/upload/sse_events/${job.sseChannelId}`, } - } else { - const result = await RemoteUpload({ - source: result.filePath, - parentDir: req.auth.session.user_id, - service: limits.useProvider, - useCompression: limits.useCompression, - cachePath: tmpPath, - }) - - return result } - } catch (error) { - await fs.promises - .rm(tmpPath, { recursive: true, force: true }) - .catch(() => { - return false - }) - throw new OperationError( - error.code ?? 500, - error.message ?? "Failed to upload file", - ) + return await Upload.fileHandle(payload) + } catch (error) { + await fs.promises.rm(workPath, { recursive: true }) + throw error } } return { - ok: 1, + next: true, chunkNumber: req.headers["uploader-chunk-number"], } }, diff --git a/packages/server/services/files/routes/upload/file/post.js b/packages/server/services/files/routes/upload/file/post.js index 37a0a687..3f1fc511 100644 --- a/packages/server/services/files/routes/upload/file/post.js +++ b/packages/server/services/files/routes/upload/file/post.js @@ -1,48 +1,44 @@ import path from "node:path" import fs from "node:fs" -import RemoteUpload from "@services/remoteUpload" +import Upload from "@classes/Upload" export default { - useContext: ["cache"], - middlewares: [ - "withAuthentication", - ], - fn: async (req, res) => { - const { cache } = this.default.contexts + useContext: ["cache"], + middlewares: ["withAuthentication"], + fn: async (req, res) => { + const workPath = path.resolve( + this.default.contexts.cache.constructor.cachePath, + `${req.auth.session.user_id}-${nanoid()}`, + ) - const providerType = req.headers["provider-type"] ?? "standard" + await fs.promises.mkdir(workPath, { recursive: true }) - const userPath = path.join(cache.constructor.cachePath, req.auth.session.user_id) + let localFilepath = null - let localFilepath = null - let tmpPath = path.resolve(userPath, `${Date.now()}`) + await req.multipart(async (field) => { + if (!field.file) { + throw new OperationError(400, "Missing file") + } - await req.multipart(async (field) => { - if (!field.file) { - throw new OperationError(400, "Missing file") - } + localFilepath = path.join(workPath, "file") - localFilepath = path.join(tmpPath, field.file.name) + await field.write(localFilepath) + }) - const existTmpDir = await fs.promises.stat(tmpPath).then(() => true).catch(() => false) + let transformations = req.headers["transformations"] - if (!existTmpDir) { - await fs.promises.mkdir(tmpPath, { recursive: true }) - } + if (transformations) { + transformations = transformations.split(",").map((t) => t.trim()) + } - await field.write(localFilepath) - }) + const result = await Upload.fileHandle({ + user_id: req.auth.session.user_id, + filePath: localFilepath, + workPath: workPath, + transformations: transformations, + }) - const result = await RemoteUpload({ - parentDir: req.auth.session.user_id, - source: localFilepath, - service: providerType, - useCompression: ToBoolean(req.headers["use-compression"]) ?? true, - }) - - fs.promises.rm(tmpPath, { recursive: true, force: true }) - - return result - } + return result + }, } diff --git a/packages/server/services/files/services/post-process/audio/index.js b/packages/server/services/files/services/post-process/audio/index.js deleted file mode 100755 index 5794e567..00000000 --- a/packages/server/services/files/services/post-process/audio/index.js +++ /dev/null @@ -1,30 +0,0 @@ -const ffmpeg = require("fluent-ffmpeg") - -export default async (file) => { - // analize metadata - let metadata = await new Promise((resolve, reject) => { - ffmpeg.ffprobe(file.filepath, (err, data) => { - if (err) { - return reject(err) - } - - resolve(data) - }) - }).catch((err) => { - console.error(err) - - return {} - }) - - if (metadata.format) { - metadata = metadata.format - } - - file.metadata = { - duration: metadata.duration, - bitrate: metadata.bit_rate, - size: metadata.size, - } - - return file -} \ No newline at end of file diff --git a/packages/server/services/files/services/post-process/image/index.js b/packages/server/services/files/services/post-process/image/index.js deleted file mode 100755 index 9ba1aacc..00000000 --- a/packages/server/services/files/services/post-process/image/index.js +++ /dev/null @@ -1,59 +0,0 @@ -import fs from "node:fs" -import path from "node:path" -import Sharp from "sharp" - -const imageProcessingConf = { - // TODO: Get image sizeThreshold from DB - sizeThreshold: 10 * 1024 * 1024, - // TODO: Get image quality from DB - imageQuality: 80, -} - -const imageTypeToConfig = { - png: { - compressionLevel: Math.floor(imageProcessingConf.imageQuality / 100), - }, - default: { - quality: imageProcessingConf.imageQuality - } -} - -/** - * Processes an image file and transforms it if it's above a certain size threshold. - * - * @async - * @function - * @param {Object} file - The file to be processed. - * @param {string} file.filepath - The path of the file to be processed. - * @param {string} file.hash - The hash of the file to be processed. - * @param {string} file.cachePath - The cache path of the file to be processed. - * @throws {Error} If the file parameter is not provided. - * @return {Object} The processed file object. - */ -async function processImage(file) { - if (!file) { - throw new Error("file is required") - } - - const stat = await fs.promises.stat(file.filepath) - - if (stat.size < imageProcessingConf.sizeThreshold) { - return file - } - - let image = await Sharp(file.filepath) - - const { format } = await image.metadata() - - image = await image[format](imageTypeToConfig[format] ?? imageTypeToConfig.default).withMetadata() - - const outputFilepath = path.resolve(file.cachePath, `${file.hash}_transformed.${format}`) - - await image.toFile(outputFilepath) - - file.filepath = outputFilepath - - return file -} - -export default processImage \ No newline at end of file diff --git a/packages/server/services/files/services/post-process/index.js b/packages/server/services/files/services/post-process/index.js deleted file mode 100755 index 17726714..00000000 --- a/packages/server/services/files/services/post-process/index.js +++ /dev/null @@ -1,53 +0,0 @@ -import fs from "node:fs" -import mimetypes from "mime-types" - -import processVideo from "./video" -import processImage from "./image" -import processAudio from "./audio" - -const fileTransformer = { - // video - "video/avi": processVideo, - "video/quicktime": processVideo, - "video/mp4": processVideo, - "video/webm": processVideo, - //image - "image/jpeg": processImage, - "image/png": processImage, - "image/gif": processImage, - "image/bmp": processImage, - "image/tiff": processImage, - "image/webp": processImage, - "image/jfif": processImage, - // audio - "audio/flac": processAudio, - "audio/x-flac": processAudio, - "audio/mp3": processAudio, - "audio/x-mp3": processAudio, - "audio/mpeg": processAudio, - "audio/x-mpeg": processAudio, - "audio/ogg": processAudio, - "audio/x-ogg": processAudio, - "audio/wav": processAudio, - "audio/x-wav": processAudio, -} - -export default async (file) => { - if (!file) { - throw new Error("file is required") - } - - if (!fs.existsSync(file.filepath)) { - throw new Error(`File ${file.filepath} not found`) - } - - const fileMimetype = mimetypes.lookup(file.filepath) - - if (typeof fileTransformer[fileMimetype] !== "function") { - console.debug(`File (${file.filepath}) has mimetype ${fileMimetype} and will not be processed`) - - return file - } - - return await fileTransformer[fileMimetype](file) -} \ No newline at end of file diff --git a/packages/server/services/files/services/post-process/video/index.js b/packages/server/services/files/services/post-process/video/index.js deleted file mode 100755 index b4f1b644..00000000 --- a/packages/server/services/files/services/post-process/video/index.js +++ /dev/null @@ -1,43 +0,0 @@ -import videoTranscode from "@services/videoTranscode" - -/** - * Processes a video file based on the specified options. - * - * @async - * @param {Object} file - The video file to process. - * @param {Object} [options={}] - The options object to use for processing. - * @param {string} [options.videoCodec="libx264"] - The video codec to use. - * @param {string} [options.format="mp4"] - The format to use. - * @param {number} [options.audioBitrate=128] - The audio bitrate to use. - * @param {number} [options.videoBitrate=2024] - The video bitrate to use. - * @throws {Error} Throws an error if file parameter is not provided. - * @return {Object} The processed video file object. - */ -async function processVideo(file, options = {}) { - if (!file) { - throw new Error("file is required") - } - - // TODO: Get values from db - const { - videoCodec = "libx264", - format = "mp4", - audioBitrate = 128, - videoBitrate = 3000, - } = options - - const result = await videoTranscode(file.filepath, { - videoCodec, - format, - audioBitrate, - videoBitrate: [videoBitrate, true], - extraOptions: ["-threads 2"], - }) - - file.filepath = result.filepath - file.filename = result.filename - - return file -} - -export default processVideo diff --git a/packages/server/services/files/services/remoteUpload/index.js b/packages/server/services/files/services/remoteUpload/index.js deleted file mode 100644 index d2c904e0..00000000 --- a/packages/server/services/files/services/remoteUpload/index.js +++ /dev/null @@ -1,162 +0,0 @@ -import fs from "node:fs" -import path from "node:path" -import mimeTypes from "mime-types" -import getFileHash from "@shared-utils/readFileHash" - -import PostProcess from "../post-process" -import Transmux from "../transmux" - -import StandardUpload from "./providers/standard" -import B2Upload from "./providers/b2" - -export default async ({ - source, - parentDir, - service, - useCompression, - cachePath, - transmux, - transmuxOptions, - isDirectory, - onProgress, -}) => { - if (!source) { - throw new OperationError(500, "source is required") - } - - if (!service) { - service = "standard" - } - - if (!parentDir) { - parentDir = "/" - } - - if (transmuxOptions) { - transmuxOptions = JSON.parse(transmuxOptions) - } - - if (useCompression) { - if (typeof onProgress === "function") { - onProgress(10, { - event: "post_processing", - }) - } - - try { - const processOutput = await PostProcess({ - filepath: source, - cachePath: cachePath, - }) - - if (processOutput) { - if (processOutput.filepath) { - source = processOutput.filepath - } - } - } catch (error) { - console.error(error) - throw new OperationError(500, `Failed to process file`) - } - } - - if (transmux) { - if (typeof onProgress === "function") { - onProgress(30, { - event: "transmuxing", - }) - } - - try { - const processOutput = await Transmux({ - transmuxer: transmux, - transmuxOptions: transmuxOptions, - filepath: source, - cachePath: cachePath, - }) - - if (processOutput) { - if (processOutput.filepath) { - source = processOutput.filepath - } - - if (processOutput.isDirectory) { - isDirectory = true - } - } - } catch (error) { - console.error(error) - throw new OperationError(500, `Failed to transmux file`) - } - } - - const type = mimeTypes.lookup(path.basename(source)) - const hash = await getFileHash(fs.createReadStream(source)) - - let fileId = `${hash}` - - // FIXME: This is a walkaround to avoid to hashing the entire directories - if (isDirectory) { - fileId = global.nanoid() - } - - let remotePath = path.join(parentDir, fileId) - - let result = {} - - const metadata = { - "Content-Type": type, - "File-Hash": hash, - } - - if (typeof onProgress === "function") { - onProgress(80, { - event: "uploading_s3", - service: service, - }) - } - - try { - switch (service) { - case "b2": - if (!global.b2Storage) { - throw new OperationError( - 500, - "B2 storage not configured on environment, unsupported service. Please use `standard` service.", - ) - } - - result = await B2Upload({ - source: isDirectory ? path.dirname(source) : source, - remotePath: remotePath, - metadata: metadata, - isDirectory: isDirectory, - targetFilename: isDirectory ? path.basename(source) : null, - }) - break - case "standard": - result = await StandardUpload({ - source: isDirectory ? path.dirname(source) : source, - remotePath: remotePath, - metadata: metadata, - isDirectory: isDirectory, - targetFilename: isDirectory ? path.basename(source) : null, - }) - break - default: - throw new OperationError(500, "Unsupported service") - } - } catch (error) { - console.error(error) - throw new OperationError(500, "Failed to upload to storage") - } - - if (typeof onProgress === "function") { - onProgress(100, { - event: "done", - result: result, - }) - } - - return result -} diff --git a/packages/server/services/files/services/remoteUpload/providers/b2/index.js b/packages/server/services/files/services/remoteUpload/providers/b2/index.js deleted file mode 100644 index fb0c7c0e..00000000 --- a/packages/server/services/files/services/remoteUpload/providers/b2/index.js +++ /dev/null @@ -1,90 +0,0 @@ -import fs from "node:fs" -import path from "node:path" -import pMap from "p-map" - -export default async function b2Upload({ - source, - remotePath, - metadata = {}, - targetFilename, - isDirectory, - retryNumber = 0 -}) { - if (isDirectory) { - let files = await fs.promises.readdir(source) - - files = files.map((file) => { - const filePath = path.join(source, file) - - const isTargetDirectory = fs.lstatSync(filePath).isDirectory() - - return { - source: filePath, - remotePath: path.join(remotePath, file), - isDirectory: isTargetDirectory, - } - }) - - await pMap( - files, - b2Upload, - { - concurrency: 5 - } - ) - - return { - id: remotePath, - url: `https://${process.env.B2_CDN_ENDPOINT}/${process.env.B2_BUCKET}/${remotePath}/${targetFilename}`, - metadata: metadata, - } - } - - try { - await global.b2Storage.authorize() - - if (!fs.existsSync(source)) { - throw new OperationError(500, "File not found") - } - - const uploadUrl = await global.b2Storage.getUploadUrl({ - bucketId: process.env.B2_BUCKET_ID, - }) - - console.debug(`Uploading object to B2 Storage >`, { - source: source, - remote: remotePath, - }) - - const data = await fs.promises.readFile(source) - - await global.b2Storage.uploadFile({ - uploadUrl: uploadUrl.data.uploadUrl, - uploadAuthToken: uploadUrl.data.authorizationToken, - fileName: remotePath, - data: data, - info: metadata - }) - } catch (error) { - console.error(error) - - if (retryNumber < 5) { - return await b2Upload({ - source, - remotePath, - metadata, - targetFilename, - isDirectory, - retryNumber: retryNumber + 1 - }) - } - - throw new OperationError(500, "B2 upload failed") - } - - return { - id: remotePath, - url: `https://${process.env.B2_CDN_ENDPOINT}/${process.env.B2_BUCKET}/${remotePath}`, - metadata: metadata, - } -} \ No newline at end of file diff --git a/packages/server/services/files/services/remoteUpload/providers/standard/index.js b/packages/server/services/files/services/remoteUpload/providers/standard/index.js deleted file mode 100644 index 3dfa3fcf..00000000 --- a/packages/server/services/files/services/remoteUpload/providers/standard/index.js +++ /dev/null @@ -1,58 +0,0 @@ -import fs from "node:fs" -import path from "node:path" -import pMap from "p-map" - -export default async function standardUpload({ - source, - remotePath, - metadata = {}, - targetFilename, - isDirectory, -}) { - if (isDirectory) { - let files = await fs.promises.readdir(source) - - files = files.map((file) => { - const filePath = path.join(source, file) - - const isTargetDirectory = fs.lstatSync(filePath).isDirectory() - - return { - source: filePath, - remotePath: path.join(remotePath, file), - isDirectory: isTargetDirectory, - } - }) - - await pMap( - files, - standardUpload, - { - concurrency: 3 - } - ) - - return { - id: remotePath, - url: global.storage.composeRemoteURL(remotePath, targetFilename), - metadata: metadata, - } - } - - console.debug(`Uploading object to S3 Minio >`, { - source: source, - remote: remotePath, - }) - - // upload to storage - await global.storage.fPutObject(process.env.S3_BUCKET, remotePath, source, metadata) - - // compose url - const url = global.storage.composeRemoteURL(remotePath) - - return { - id: remotePath, - url: url, - metadata: metadata, - } -} \ No newline at end of file diff --git a/packages/server/services/files/services/transmux/index.js b/packages/server/services/files/services/transmux/index.js deleted file mode 100644 index 1b4d3cae..00000000 --- a/packages/server/services/files/services/transmux/index.js +++ /dev/null @@ -1,108 +0,0 @@ -import fs from "node:fs" -import path from "node:path" - -import MultiqualityHLSJob from "@shared-classes/MultiqualityHLSJob" -import SegmentedAudioMPDJob from "@shared-classes/SegmentedAudioMPDJob" - -const transmuxers = [ - { - id: "mq-hls", - container: "hls", - extension: "m3u8", - multipleOutput: true, - buildCommand: (input, outputDir) => { - return new MultiqualityHLSJob({ - input: input, - outputDir: outputDir, - outputMasterName: "master.m3u8", - levels: [ - { - original: true, - codec: "libx264", - bitrate: "10M", - preset: "ultrafast", - }, - { - codec: "libx264", - width: 1280, - bitrate: "3M", - preset: "ultrafast", - } - ] - }) - }, - }, - { - id: "a-dash", - container: "dash", - extension: "mpd", - multipleOutput: true, - buildCommand: (input, outputDir) => { - return new SegmentedAudioMPDJob({ - input: input, - outputDir: outputDir, - outputMasterName: "master.mpd", - - audioCodec: "flac", - //audioBitrate: "1600k", - //audioSampleRate: 96000, - segmentTime: 10, - }) - } - }, -] - -export default async (params) => { - if (!params) { - throw new Error("params is required") - } - - if (!params.filepath) { - throw new Error("filepath is required") - } - - if (!params.cachePath) { - throw new Error("cachePath is required") - } - - if (!params.transmuxer) { - throw new Error("transmuxer is required") - } - - if (!fs.existsSync(params.filepath)) { - throw new Error(`File ${params.filepath} not found`) - } - - const transmuxer = transmuxers.find((item) => item.id === params.transmuxer) - - if (!transmuxer) { - throw new Error(`Transmuxer ${params.transmuxer} not found`) - } - - const jobPath = path.dirname(params.filepath) - - if (!fs.existsSync(path.dirname(jobPath))) { - fs.mkdirSync(path.dirname(jobPath), { recursive: true }) - } - - return await new Promise((resolve, reject) => { - try { - const command = transmuxer.buildCommand(params.filepath, jobPath) - - command - .on("progress", function (progress) { - console.log("Processing: " + progress.percent + "% done") - }) - .on("error", (err) => { - reject(err) - }) - .on("end", (data) => { - resolve(data) - }) - .run() - } catch (error) { - console.error(`[TRANSMUX] Transmuxing failed`, error) - reject(error) - } - }) -} \ No newline at end of file diff --git a/packages/server/services/files/services/videoTranscode/index.js b/packages/server/services/files/services/videoTranscode/index.js deleted file mode 100755 index 5d74f2a4..00000000 --- a/packages/server/services/files/services/videoTranscode/index.js +++ /dev/null @@ -1,98 +0,0 @@ -import path from "path" - -const ffmpeg = require("fluent-ffmpeg") - -const defaultParams = { - audioBitrate: 128, - videoBitrate: 1024, - videoCodec: "libvpx", - audioCodec: "libvorbis", - format: "mp4", -} - -const maxTasks = 5 - -export default (input, params = defaultParams) => { - return new Promise((resolve, reject) => { - if (!global.ffmpegTasks) { - global.ffmpegTasks = [] - } - - if (global.ffmpegTasks.length >= maxTasks) { - return reject(new Error("Too many transcoding tasks")) - } - - const outputFilename = `${path.basename(input).split(".")[0]}_ff.${params.format ?? "webm"}` - const outputFilepath = `${path.dirname(input)}/${outputFilename}` - - console.debug(`[TRANSCODING] Transcoding ${input} to ${outputFilepath}`) - - const onEnd = async () => { - console.debug( - `[TRANSCODING] Finished transcode ${input} to ${outputFilepath}`, - ) - - return resolve({ - filename: outputFilename, - filepath: outputFilepath, - }) - } - - const onError = (err) => { - console.error( - `[TRANSCODING] Transcoding ${input} to ${outputFilepath} failed`, - err, - ) - - return reject(err) - } - - let exec = null - - const commands = { - input: input, - ...params, - output: outputFilepath, - outputOptions: ["-preset veryfast"], - } - - // chain methods - for (let key in commands) { - if (exec === null) { - exec = ffmpeg(commands[key]) - continue - } - - if (key === "extraOptions" && Array.isArray(commands[key])) { - for (const option of commands[key]) { - exec = exec.inputOptions(option) - } - - continue - } - - if (key === "outputOptions" && Array.isArray(commands[key])) { - for (const option of commands[key]) { - exec = exec.outputOptions(option) - } - - continue - } - - if (typeof exec[key] !== "function") { - console.warn(`[TRANSCODING] Method ${key} is not a function`) - return false - } - - if (Array.isArray(commands[key])) { - exec = exec[key](...commands[key]) - } else { - exec = exec[key](commands[key]) - } - - continue - } - - exec.on("error", onError).on("end", onEnd).run() - }) -} diff --git a/packages/server/services/files/utils/bufferToStream/index.js b/packages/server/services/files/utils/bufferToStream/index.js new file mode 100644 index 00000000..e735c77a --- /dev/null +++ b/packages/server/services/files/utils/bufferToStream/index.js @@ -0,0 +1,10 @@ +import { Duplex } from "node:stream" + +export default (bf) => { + let tmp = new Duplex() + + tmp.push(bf) + tmp.push(null) + + return tmp +} diff --git a/packages/server/services/files/utils/readChunk/index.js b/packages/server/services/files/utils/readChunk/index.js new file mode 100644 index 00000000..3fe3a3c0 --- /dev/null +++ b/packages/server/services/files/utils/readChunk/index.js @@ -0,0 +1,22 @@ +// Original fork from https://github.com/sindresorhus/read-chunk +import { open } from "node:fs/promises" + +export default async (filePath, { length, startPosition }) => { + const fileDescriptor = await open(filePath, "r") + + try { + let { bytesRead, buffer } = await fileDescriptor.read({ + buffer: new Uint8Array(length), + length, + position: startPosition, + }) + + if (bytesRead < length) { + buffer = buffer.subarray(0, bytesRead) + } + + return buffer + } finally { + await fileDescriptor?.close() + } +}