Update StorageClient with CDN support and B2 integration

This commit is contained in:
SrGooglo 2025-04-24 08:59:28 +00:00
parent ad860f51d3
commit 0c7b6d7720
5 changed files with 142 additions and 110 deletions

View File

@ -1,106 +1,118 @@
const Minio = require("minio") import path from "node:path"
import path from "path" import { Client } from "minio"
export const generateDefaultBucketPolicy = (payload) => { export const generateDefaultBucketPolicy = (payload) => {
const { bucketName } = payload const { bucketName } = payload
if (!bucketName) { if (!bucketName) {
throw new Error("bucketName is required") throw new Error("bucketName is required")
} }
return { return {
Version: "2012-10-17", Version: "2012-10-17",
Statement: [ Statement: [
{ {
Action: [ Action: ["s3:GetObject"],
"s3:GetObject" Effect: "Allow",
], Principal: {
Effect: "Allow", AWS: ["*"],
Principal: { },
AWS: [ Resource: [`arn:aws:s3:::${bucketName}/*`],
"*" Sid: "",
] },
}, ],
Resource: [ }
`arn:aws:s3:::${bucketName}/*`
],
Sid: ""
}
]
}
} }
export class StorageClient extends Minio.Client { export class StorageClient extends Client {
constructor(options) { constructor(options) {
super(options) super(options)
this.defaultBucket = String(options.defaultBucket) this.defaultBucket = String(options.defaultBucket)
this.defaultRegion = String(options.defaultRegion) this.defaultRegion = String(options.defaultRegion)
} this.setupBucket = Boolean(options.setupBucket)
this.cdnUrl = options.cdnUrl
}
composeRemoteURL = (key, extraKey) => { composeRemoteURL = (key, extraKey) => {
let _path = path.join(this.defaultBucket, key) let _path = path.join(this.defaultBucket, key)
if (typeof extraKey === "string") { if (typeof extraKey === "string") {
_path = path.join(_path, extraKey) _path = path.join(_path, extraKey)
} }
return `${this.protocol}//${this.host}:${this.port}/${_path}` if (this.cdnUrl) {
} return `${this.cdnUrl}/${_path}`
}
setDefaultBucketPolicy = async (bucketName) => { return `${this.protocol}//${this.host}:${this.port}/${_path}`
const policy = generateDefaultBucketPolicy({ bucketName }) }
return this.setBucketPolicy(bucketName, JSON.stringify(policy)) setDefaultBucketPolicy = async (bucketName) => {
} const policy = generateDefaultBucketPolicy({ bucketName })
initialize = async () => { return this.setBucketPolicy(bucketName, JSON.stringify(policy))
console.log("🔌 Checking if storage client have default bucket...") }
try { initialize = async () => {
const bucketExists = await this.bucketExists(this.defaultBucket) console.log("🔌 Checking if storage client have default bucket...")
if (!bucketExists) { if (this.setupBucket !== false) {
console.warn("🪣 Default bucket not exists! Creating new bucket...") try {
const bucketExists = await this.bucketExists(this.defaultBucket)
await this.makeBucket(this.defaultBucket, "s3") if (!bucketExists) {
console.warn(
"🪣 Default bucket not exists! Creating new bucket...",
)
// set default bucket policy await this.makeBucket(this.defaultBucket, "s3")
await this.setDefaultBucketPolicy(this.defaultBucket)
}
} catch (error) {
console.error(`Failed to check if default bucket exists or create default bucket >`, error)
}
try { // set default bucket policy
// check if default bucket policy exists await this.setDefaultBucketPolicy(this.defaultBucket)
const bucketPolicy = await this.getBucketPolicy(this.defaultBucket).catch(() => { }
return null } catch (error) {
}) console.error(
`Failed to check if default bucket exists or create default bucket >`,
error,
)
}
if (!bucketPolicy) { try {
// set default bucket policy // check if default bucket policy exists
await this.setDefaultBucketPolicy(this.defaultBucket) const bucketPolicy = await this.getBucketPolicy(
} this.defaultBucket,
} catch (error) { ).catch(() => {
console.error(`Failed to get or set default bucket policy >`, error) return null
} })
console.log("✅ Storage client is ready.") if (!bucketPolicy) {
} // set default bucket policy
await this.setDefaultBucketPolicy(this.defaultBucket)
}
} catch (error) {
console.error(
`Failed to get or set default bucket policy >`,
error,
)
}
}
console.log("✅ Storage client is ready.")
}
} }
export const createStorageClientInstance = (options) => { export const createStorageClientInstance = (options) => {
return new StorageClient({ return new StorageClient({
endPoint: process.env.S3_ENDPOINT, endPoint: process.env.S3_ENDPOINT,
port: Number(process.env.S3_PORT), port: Number(process.env.S3_PORT),
useSSL: ToBoolean(process.env.S3_USE_SSL), useSSL: ToBoolean(process.env.S3_USE_SSL),
accessKey: process.env.S3_ACCESS_KEY, accessKey: process.env.S3_ACCESS_KEY,
secretKey: process.env.S3_SECRET_KEY, secretKey: process.env.S3_SECRET_KEY,
defaultBucket: process.env.S3_BUCKET, defaultBucket: process.env.S3_BUCKET,
defaultRegion: process.env.S3_REGION, defaultRegion: process.env.S3_REGION,
...options, ...options,
}) })
} }
export default createStorageClientInstance export default createStorageClientInstance

View File

@ -23,6 +23,7 @@ export type S3UploadPayload = {
filePath: string filePath: string
basePath: string basePath: string
targetPath?: string targetPath?: string
s3Provider?: string
onProgress?: Function onProgress?: Function
} }
@ -38,8 +39,9 @@ export default class Upload {
const result = await Upload.toS3({ const result = await Upload.toS3({
filePath: payload.filePath, filePath: payload.filePath,
targetPath: payload.targetPath, targetPath: payload.targetPath,
basePath: `${payload.user_id}/${global.nanoid()}`, basePath: payload.user_id,
onProgress: payload.onProgress, onProgress: payload.onProgress,
s3Provider: payload.s3Provider,
}) })
// delete workpath // delete workpath
@ -76,21 +78,22 @@ export default class Upload {
} }
static toS3 = async (payload: S3UploadPayload) => { static toS3 = async (payload: S3UploadPayload) => {
const { filePath, basePath, targetPath, onProgress } = payload const { filePath, basePath, targetPath, s3Provider, onProgress } =
payload
// if targetPath is provided, means its a directory // if targetPath is provided, means its a directory
const isDirectory = targetPath !== undefined const isDirectory = !!targetPath
let uploadPath = path.resolve(basePath, path.basename(filePath))
if (isDirectory) {
uploadPath = basePath
}
const metadata = await this.buildFileMetadata( const metadata = await this.buildFileMetadata(
isDirectory ? targetPath : filePath, isDirectory ? targetPath : filePath,
) )
let uploadPath = path.join(basePath, metadata["File-Hash"])
if (isDirectory) {
uploadPath = path.join(basePath, nanoid())
}
if (typeof onProgress === "function") { if (typeof onProgress === "function") {
onProgress({ onProgress({
percent: 0, percent: 0,
@ -98,19 +101,21 @@ export default class Upload {
}) })
} }
console.log("Uploading to S3:", { // console.log("Uploading to S3:", {
filePath, // filePath: filePath,
uploadPath, // basePath: basePath,
basePath, // uploadPath: uploadPath,
targetPath, // targetPath: targetPath,
metadata, // metadata: metadata,
}) // s3Provider: s3Provider,
// })
const result = await putObject({ const result = await putObject({
filePath: filePath, filePath: filePath,
uploadPath: uploadPath, uploadPath: uploadPath,
metadata: metadata, metadata: metadata,
targetFilename: isDirectory ? path.basename(targetPath) : null, targetFilename: isDirectory ? path.basename(targetPath) : null,
provider: s3Provider,
}) })
return result return result

View File

@ -8,7 +8,14 @@ export default async function putObject({
metadata = {}, metadata = {},
targetFilename, targetFilename,
onFinish, onFinish,
provider = "standard",
}) { }) {
const providerClass = global.storages[provider]
if (!providerClass) {
throw new Error(`Provider [${provider}] not found`)
}
const isDirectory = await fs.promises const isDirectory = await fs.promises
.lstat(filePath) .lstat(filePath)
.then((stats) => stats.isDirectory()) .then((stats) => stats.isDirectory())
@ -31,13 +38,13 @@ export default async function putObject({
return { return {
id: uploadPath, id: uploadPath,
url: global.storage.composeRemoteURL(uploadPath, targetFilename), url: providerClass.composeRemoteURL(uploadPath, targetFilename),
metadata: metadata, metadata: metadata,
} }
} }
// upload to storage // upload to storage
await global.storage.fPutObject( await providerClass.fPutObject(
process.env.S3_BUCKET, process.env.S3_BUCKET,
uploadPath, uploadPath,
filePath, filePath,
@ -46,7 +53,7 @@ export default async function putObject({
const result = { const result = {
id: uploadPath, id: uploadPath,
url: global.storage.composeRemoteURL(uploadPath), url: providerClass.composeRemoteURL(uploadPath),
metadata: metadata, metadata: metadata,
} }

View File

@ -25,13 +25,13 @@ class API extends Server {
contexts = { contexts = {
db: new DbManager(), db: new DbManager(),
cache: new CacheService(), cache: new CacheService(),
storage: StorageClient(),
b2Storage: null,
SSEManager: new SSEManager(), SSEManager: new SSEManager(),
redis: RedisClient({ redis: RedisClient({
maxRetriesPerRequest: null, maxRetriesPerRequest: null,
}), }),
limits: {}, limits: {},
storage: StorageClient(),
b2Storage: null,
} }
queuesManager = new TaskQueueManager( queuesManager = new TaskQueueManager(
@ -45,14 +45,18 @@ class API extends Server {
global.sse = this.contexts.SSEManager global.sse = this.contexts.SSEManager
if (process.env.B2_KEY_ID && process.env.B2_APP_KEY) { if (process.env.B2_KEY_ID && process.env.B2_APP_KEY) {
this.contexts.b2Storage = new B2({ this.contexts.b2Storage = StorageClient({
applicationKeyId: process.env.B2_KEY_ID, endPoint: process.env.B2_ENDPOINT,
applicationKey: process.env.B2_APP_KEY, cdnUrl: process.env.B2_CDN_ENDPOINT,
defaultBucket: process.env.B2_BUCKET,
accessKey: process.env.B2_KEY_ID,
secretKey: process.env.B2_APP_KEY,
port: 443,
useSSL: true,
setupBucket: false,
}) })
global.b2Storage = this.contexts.b2Storage await this.contexts.b2Storage.initialize()
await this.contexts.b2Storage.authorize()
} else { } else {
console.warn( console.warn(
"B2 storage not configured on environment, skipping...", "B2 storage not configured on environment, skipping...",
@ -66,7 +70,10 @@ class API extends Server {
await this.contexts.db.initialize() await this.contexts.db.initialize()
await this.contexts.storage.initialize() await this.contexts.storage.initialize()
global.storage = this.contexts.storage global.storages = {
standard: this.contexts.storage,
b2: this.contexts.b2Storage,
}
global.queues = this.queuesManager global.queues = this.queuesManager
this.contexts.limits = await LimitsClass.get() this.contexts.limits = await LimitsClass.get()

View File

@ -35,7 +35,7 @@ export default {
1024 * 1024 *
1024, 1024,
useCompression: true, useCompression: true,
useProvider: "standard", useProvider: req.headers["use-provider"] ?? "standard",
} }
// const user = await req.auth.user() // const user = await req.auth.user()
@ -85,6 +85,7 @@ export default {
filePath: assemble.filePath, filePath: assemble.filePath,
workPath: workPath, workPath: workPath,
transformations: transformations, transformations: transformations,
s3Provider: config.useProvider,
} }
// if has transformations, use background job // if has transformations, use background job