diff --git a/packages/server/classes/StorageClient/index.js b/packages/server/classes/StorageClient/index.js index 810d54ce..c0c97a10 100755 --- a/packages/server/classes/StorageClient/index.js +++ b/packages/server/classes/StorageClient/index.js @@ -1,106 +1,118 @@ -const Minio = require("minio") -import path from "path" +import path from "node:path" +import { Client } from "minio" export const generateDefaultBucketPolicy = (payload) => { - const { bucketName } = payload + const { bucketName } = payload - if (!bucketName) { - throw new Error("bucketName is required") - } + if (!bucketName) { + throw new Error("bucketName is required") + } - return { - Version: "2012-10-17", - Statement: [ - { - Action: [ - "s3:GetObject" - ], - Effect: "Allow", - Principal: { - AWS: [ - "*" - ] - }, - Resource: [ - `arn:aws:s3:::${bucketName}/*` - ], - Sid: "" - } - ] - } + return { + Version: "2012-10-17", + Statement: [ + { + Action: ["s3:GetObject"], + Effect: "Allow", + Principal: { + AWS: ["*"], + }, + Resource: [`arn:aws:s3:::${bucketName}/*`], + Sid: "", + }, + ], + } } -export class StorageClient extends Minio.Client { - constructor(options) { - super(options) +export class StorageClient extends Client { + constructor(options) { + super(options) - this.defaultBucket = String(options.defaultBucket) - this.defaultRegion = String(options.defaultRegion) - } + this.defaultBucket = String(options.defaultBucket) + this.defaultRegion = String(options.defaultRegion) + this.setupBucket = Boolean(options.setupBucket) + this.cdnUrl = options.cdnUrl + } - composeRemoteURL = (key, extraKey) => { - let _path = path.join(this.defaultBucket, key) + composeRemoteURL = (key, extraKey) => { + let _path = path.join(this.defaultBucket, key) - if (typeof extraKey === "string") { - _path = path.join(_path, extraKey) - } + if (typeof extraKey === "string") { + _path = path.join(_path, extraKey) + } - return `${this.protocol}//${this.host}:${this.port}/${_path}` - } + if (this.cdnUrl) { + return `${this.cdnUrl}/${_path}` + } - setDefaultBucketPolicy = async (bucketName) => { - const policy = generateDefaultBucketPolicy({ bucketName }) + return `${this.protocol}//${this.host}:${this.port}/${_path}` + } - return this.setBucketPolicy(bucketName, JSON.stringify(policy)) - } + setDefaultBucketPolicy = async (bucketName) => { + const policy = generateDefaultBucketPolicy({ bucketName }) - initialize = async () => { - console.log("🔌 Checking if storage client have default bucket...") + return this.setBucketPolicy(bucketName, JSON.stringify(policy)) + } - try { - const bucketExists = await this.bucketExists(this.defaultBucket) + initialize = async () => { + console.log("🔌 Checking if storage client have default bucket...") - if (!bucketExists) { - console.warn("🪣 Default bucket not exists! Creating new bucket...") + if (this.setupBucket !== false) { + try { + const bucketExists = await this.bucketExists(this.defaultBucket) - await this.makeBucket(this.defaultBucket, "s3") + if (!bucketExists) { + console.warn( + "🪣 Default bucket not exists! Creating new bucket...", + ) - // set default bucket policy - await this.setDefaultBucketPolicy(this.defaultBucket) - } - } catch (error) { - console.error(`Failed to check if default bucket exists or create default bucket >`, error) - } + await this.makeBucket(this.defaultBucket, "s3") - try { - // check if default bucket policy exists - const bucketPolicy = await this.getBucketPolicy(this.defaultBucket).catch(() => { - return null - }) + // set default bucket policy + await this.setDefaultBucketPolicy(this.defaultBucket) + } + } catch (error) { + console.error( + `Failed to check if default bucket exists or create default bucket >`, + error, + ) + } - if (!bucketPolicy) { - // set default bucket policy - await this.setDefaultBucketPolicy(this.defaultBucket) - } - } catch (error) { - console.error(`Failed to get or set default bucket policy >`, error) - } + try { + // check if default bucket policy exists + const bucketPolicy = await this.getBucketPolicy( + this.defaultBucket, + ).catch(() => { + return null + }) - console.log("✅ Storage client is ready.") - } + if (!bucketPolicy) { + // set default bucket policy + await this.setDefaultBucketPolicy(this.defaultBucket) + } + } catch (error) { + console.error( + `Failed to get or set default bucket policy >`, + error, + ) + } + } + + console.log("✅ Storage client is ready.") + } } export const createStorageClientInstance = (options) => { - return new StorageClient({ - endPoint: process.env.S3_ENDPOINT, - port: Number(process.env.S3_PORT), - useSSL: ToBoolean(process.env.S3_USE_SSL), - accessKey: process.env.S3_ACCESS_KEY, - secretKey: process.env.S3_SECRET_KEY, - defaultBucket: process.env.S3_BUCKET, - defaultRegion: process.env.S3_REGION, - ...options, - }) + return new StorageClient({ + endPoint: process.env.S3_ENDPOINT, + port: Number(process.env.S3_PORT), + useSSL: ToBoolean(process.env.S3_USE_SSL), + accessKey: process.env.S3_ACCESS_KEY, + secretKey: process.env.S3_SECRET_KEY, + defaultBucket: process.env.S3_BUCKET, + defaultRegion: process.env.S3_REGION, + ...options, + }) } -export default createStorageClientInstance \ No newline at end of file +export default createStorageClientInstance diff --git a/packages/server/services/files/classes/Upload/index.ts b/packages/server/services/files/classes/Upload/index.ts index 712aca04..684894fd 100644 --- a/packages/server/services/files/classes/Upload/index.ts +++ b/packages/server/services/files/classes/Upload/index.ts @@ -23,6 +23,7 @@ export type S3UploadPayload = { filePath: string basePath: string targetPath?: string + s3Provider?: string onProgress?: Function } @@ -38,8 +39,9 @@ export default class Upload { const result = await Upload.toS3({ filePath: payload.filePath, targetPath: payload.targetPath, - basePath: `${payload.user_id}/${global.nanoid()}`, + basePath: payload.user_id, onProgress: payload.onProgress, + s3Provider: payload.s3Provider, }) // delete workpath @@ -76,21 +78,22 @@ export default class Upload { } static toS3 = async (payload: S3UploadPayload) => { - const { filePath, basePath, targetPath, onProgress } = payload + const { filePath, basePath, targetPath, s3Provider, onProgress } = + payload // if targetPath is provided, means its a directory - const isDirectory = targetPath !== undefined - - let uploadPath = path.resolve(basePath, path.basename(filePath)) - - if (isDirectory) { - uploadPath = basePath - } + const isDirectory = !!targetPath const metadata = await this.buildFileMetadata( isDirectory ? targetPath : filePath, ) + let uploadPath = path.join(basePath, metadata["File-Hash"]) + + if (isDirectory) { + uploadPath = path.join(basePath, nanoid()) + } + if (typeof onProgress === "function") { onProgress({ percent: 0, @@ -98,19 +101,21 @@ export default class Upload { }) } - console.log("Uploading to S3:", { - filePath, - uploadPath, - basePath, - targetPath, - metadata, - }) + // console.log("Uploading to S3:", { + // filePath: filePath, + // basePath: basePath, + // uploadPath: uploadPath, + // targetPath: targetPath, + // metadata: metadata, + // s3Provider: s3Provider, + // }) const result = await putObject({ filePath: filePath, uploadPath: uploadPath, metadata: metadata, targetFilename: isDirectory ? path.basename(targetPath) : null, + provider: s3Provider, }) return result diff --git a/packages/server/services/files/classes/Upload/putObject.js b/packages/server/services/files/classes/Upload/putObject.js index bc8ec7c0..91c81f74 100644 --- a/packages/server/services/files/classes/Upload/putObject.js +++ b/packages/server/services/files/classes/Upload/putObject.js @@ -8,7 +8,14 @@ export default async function putObject({ metadata = {}, targetFilename, onFinish, + provider = "standard", }) { + const providerClass = global.storages[provider] + + if (!providerClass) { + throw new Error(`Provider [${provider}] not found`) + } + const isDirectory = await fs.promises .lstat(filePath) .then((stats) => stats.isDirectory()) @@ -31,13 +38,13 @@ export default async function putObject({ return { id: uploadPath, - url: global.storage.composeRemoteURL(uploadPath, targetFilename), + url: providerClass.composeRemoteURL(uploadPath, targetFilename), metadata: metadata, } } // upload to storage - await global.storage.fPutObject( + await providerClass.fPutObject( process.env.S3_BUCKET, uploadPath, filePath, @@ -46,7 +53,7 @@ export default async function putObject({ const result = { id: uploadPath, - url: global.storage.composeRemoteURL(uploadPath), + url: providerClass.composeRemoteURL(uploadPath), metadata: metadata, } diff --git a/packages/server/services/files/file.service.js b/packages/server/services/files/file.service.js index a7c80f10..835088fc 100755 --- a/packages/server/services/files/file.service.js +++ b/packages/server/services/files/file.service.js @@ -25,13 +25,13 @@ class API extends Server { contexts = { db: new DbManager(), cache: new CacheService(), - storage: StorageClient(), - b2Storage: null, SSEManager: new SSEManager(), redis: RedisClient({ maxRetriesPerRequest: null, }), limits: {}, + storage: StorageClient(), + b2Storage: null, } queuesManager = new TaskQueueManager( @@ -45,14 +45,18 @@ class API extends Server { global.sse = this.contexts.SSEManager if (process.env.B2_KEY_ID && process.env.B2_APP_KEY) { - this.contexts.b2Storage = new B2({ - applicationKeyId: process.env.B2_KEY_ID, - applicationKey: process.env.B2_APP_KEY, + this.contexts.b2Storage = StorageClient({ + endPoint: process.env.B2_ENDPOINT, + cdnUrl: process.env.B2_CDN_ENDPOINT, + defaultBucket: process.env.B2_BUCKET, + accessKey: process.env.B2_KEY_ID, + secretKey: process.env.B2_APP_KEY, + port: 443, + useSSL: true, + setupBucket: false, }) - global.b2Storage = this.contexts.b2Storage - - await this.contexts.b2Storage.authorize() + await this.contexts.b2Storage.initialize() } else { console.warn( "B2 storage not configured on environment, skipping...", @@ -66,7 +70,10 @@ class API extends Server { await this.contexts.db.initialize() await this.contexts.storage.initialize() - global.storage = this.contexts.storage + global.storages = { + standard: this.contexts.storage, + b2: this.contexts.b2Storage, + } global.queues = this.queuesManager this.contexts.limits = await LimitsClass.get() diff --git a/packages/server/services/files/routes/upload/chunk/post.js b/packages/server/services/files/routes/upload/chunk/post.js index c1111e01..5fc3ed32 100644 --- a/packages/server/services/files/routes/upload/chunk/post.js +++ b/packages/server/services/files/routes/upload/chunk/post.js @@ -35,7 +35,7 @@ export default { 1024 * 1024, useCompression: true, - useProvider: "standard", + useProvider: req.headers["use-provider"] ?? "standard", } // const user = await req.auth.user() @@ -85,6 +85,7 @@ export default { filePath: assemble.filePath, workPath: workPath, transformations: transformations, + s3Provider: config.useProvider, } // if has transformations, use background job