improve chunked uploads to support background jobs with sse events

This commit is contained in:
SrGooglo 2025-02-05 02:42:44 +00:00
parent 11b52595c1
commit 4d0657c0df
2 changed files with 63 additions and 9 deletions

View File

@ -1,4 +1,5 @@
import { EventBus } from "vessel" import { EventBus } from "@ragestudio/vessel"
import { events, stream } from "fetch-event-stream"
export default class ChunkedUpload { export default class ChunkedUpload {
constructor(params) { constructor(params) {
@ -44,7 +45,9 @@ export default class ChunkedUpload {
"uploader-original-name": encodeURIComponent(file.name), "uploader-original-name": encodeURIComponent(file.name),
"uploader-file-id": this.getFileUID(file), "uploader-file-id": this.getFileUID(file),
"uploader-chunks-total": this.totalChunks, "uploader-chunks-total": this.totalChunks,
"chunk-size": splitChunkSize "chunk-size": splitChunkSize,
"Connection": "keep-alive",
"Cache-Control": "no-cache"
} }
this.setupListeners() this.setupListeners()
@ -86,14 +89,20 @@ export default class ChunkedUpload {
this.headers["uploader-chunk-number"] = this.chunkCount this.headers["uploader-chunk-number"] = this.chunkCount
console.log(`[UPLOADER] Sending chunk ${this.chunkCount}`, {
currentChunk: this.chunkCount,
totalChunks: this.totalChunks,
})
try { try {
const res = await fetch( const res = await fetch(
this.endpoint, this.endpoint,
{ {
method: "POST", method: "POST",
headers: this.headers, headers: this.headers,
body: form body: form,
}) },
)
return res return res
} catch (error) { } catch (error) {
@ -120,11 +129,56 @@ export default class ChunkedUpload {
const res = await this.sendChunk() const res = await this.sendChunk()
const data = await res.json()
if ([200, 201, 204].includes(res.status)) { if ([200, 201, 204].includes(res.status)) {
if (++this.chunkCount < this.totalChunks) { console.log(`[UPLOADER] Chunk ${this.chunkCount} sent`)
this.chunkCount = this.chunkCount + 1
if (this.chunkCount < this.totalChunks) {
this.nextSend() this.nextSend()
}
// check if is the last chunk, if so, handle sse events
if (this.chunkCount === this.totalChunks) {
if (data.eventChannelURL) {
console.log(`[UPLOADER] Connecting to SSE channel >`, data.eventChannelURL)
const eventSource = new EventSource(data.eventChannelURL)
eventSource.onerror = (error) => {
this.events.emit("error", error)
}
eventSource.onopen = () => {
console.log(`[UPLOADER] SSE channel opened`)
}
eventSource.onmessage = (event) => {
// parse json
const messageData = JSON.parse(event.data)
console.log(`[UPLOADER] SSE Event >`, messageData)
if (messageData.status === "done") {
this.events.emit("finish", messageData.result)
eventSource.close()
}
if (messageData.status === "error") {
this.events.emit("error", messageData.result)
}
if (messageData.status === "progress") {
this.events.emit("progress", {
percentProgress: messageData.progress,
})
}
}
} else { } else {
res.json().then((body) => this.events.emit("finish", body)) this.events.emit("finish", data)
}
} }
this.events.emit("progress", { this.events.emit("progress", {
@ -133,7 +187,7 @@ export default class ChunkedUpload {
} else if ([408, 502, 503, 504].includes(res.status)) { } else if ([408, 502, 503, 504].includes(res.status)) {
this.manageRetries() this.manageRetries()
} else { } else {
res.json().then((body) => this.events.emit("error", { message: `[${res.status}] ${body.error ?? body.message}` })) this.events.emit("error", { message: `[${res.status}] ${data.error ?? data.message}` })
} }
} }

View File

@ -1,4 +1,4 @@
import { Core } from "vessel" import { Core } from "@ragestudio/vessel"
import ChunkedUpload from "./chunkedUpload" import ChunkedUpload from "./chunkedUpload"
import SessionModel from "@models/session" import SessionModel from "@models/session"