From 4d0657c0dfbca801e781a40b9e082646e3107010 Mon Sep 17 00:00:00 2001 From: SrGooglo Date: Wed, 5 Feb 2025 02:42:44 +0000 Subject: [PATCH] improve chunked uploads to support background jobs with sse events --- .../src/cores/remoteStorage/chunkedUpload.js | 70 ++++++++++++++++--- .../cores/remoteStorage/remoteStorage.core.js | 2 +- 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/packages/app/src/cores/remoteStorage/chunkedUpload.js b/packages/app/src/cores/remoteStorage/chunkedUpload.js index b17f4605..95b4e562 100644 --- a/packages/app/src/cores/remoteStorage/chunkedUpload.js +++ b/packages/app/src/cores/remoteStorage/chunkedUpload.js @@ -1,4 +1,5 @@ -import { EventBus } from "vessel" +import { EventBus } from "@ragestudio/vessel" +import { events, stream } from "fetch-event-stream" export default class ChunkedUpload { constructor(params) { @@ -44,7 +45,9 @@ export default class ChunkedUpload { "uploader-original-name": encodeURIComponent(file.name), "uploader-file-id": this.getFileUID(file), "uploader-chunks-total": this.totalChunks, - "chunk-size": splitChunkSize + "chunk-size": splitChunkSize, + "Connection": "keep-alive", + "Cache-Control": "no-cache" } this.setupListeners() @@ -86,14 +89,20 @@ export default class ChunkedUpload { this.headers["uploader-chunk-number"] = this.chunkCount + console.log(`[UPLOADER] Sending chunk ${this.chunkCount}`, { + currentChunk: this.chunkCount, + totalChunks: this.totalChunks, + }) + try { const res = await fetch( this.endpoint, { method: "POST", headers: this.headers, - body: form - }) + body: form, + }, + ) return res } catch (error) { @@ -120,11 +129,56 @@ export default class ChunkedUpload { const res = await this.sendChunk() + const data = await res.json() + if ([200, 201, 204].includes(res.status)) { - if (++this.chunkCount < this.totalChunks) { + console.log(`[UPLOADER] Chunk ${this.chunkCount} sent`) + + this.chunkCount = this.chunkCount + 1 + + if (this.chunkCount < this.totalChunks) { this.nextSend() - } else { - res.json().then((body) => this.events.emit("finish", body)) + } + + // check if is the last chunk, if so, handle sse events + if (this.chunkCount === this.totalChunks) { + if (data.eventChannelURL) { + console.log(`[UPLOADER] Connecting to SSE channel >`, data.eventChannelURL) + + const eventSource = new EventSource(data.eventChannelURL) + + eventSource.onerror = (error) => { + this.events.emit("error", error) + } + + eventSource.onopen = () => { + console.log(`[UPLOADER] SSE channel opened`) + } + + eventSource.onmessage = (event) => { + // parse json + const messageData = JSON.parse(event.data) + + console.log(`[UPLOADER] SSE Event >`, messageData) + + if (messageData.status === "done") { + this.events.emit("finish", messageData.result) + eventSource.close() + } + + if (messageData.status === "error") { + this.events.emit("error", messageData.result) + } + + if (messageData.status === "progress") { + this.events.emit("progress", { + percentProgress: messageData.progress, + }) + } + } + } else { + this.events.emit("finish", data) + } } this.events.emit("progress", { @@ -133,7 +187,7 @@ export default class ChunkedUpload { } else if ([408, 502, 503, 504].includes(res.status)) { this.manageRetries() } else { - res.json().then((body) => this.events.emit("error", { message: `[${res.status}] ${body.error ?? body.message}` })) + this.events.emit("error", { message: `[${res.status}] ${data.error ?? data.message}` }) } } diff --git a/packages/app/src/cores/remoteStorage/remoteStorage.core.js b/packages/app/src/cores/remoteStorage/remoteStorage.core.js index ce926f5d..d3938038 100755 --- a/packages/app/src/cores/remoteStorage/remoteStorage.core.js +++ b/packages/app/src/cores/remoteStorage/remoteStorage.core.js @@ -1,4 +1,4 @@ -import { Core } from "vessel" +import { Core } from "@ragestudio/vessel" import ChunkedUpload from "./chunkedUpload" import SessionModel from "@models/session"