diff --git a/packages/app/src/cores/index.js b/packages/app/src/cores/index.js index 8fd257d4..c6b08872 100755 --- a/packages/app/src/cores/index.js +++ b/packages/app/src/cores/index.js @@ -1,4 +1,5 @@ import SettingsCore from "./settings" +import TasksQueue from "./tasksQueue" import APICore from "./api" import StyleCore from "./style" import PermissionsCore from "./permissions" @@ -15,6 +16,8 @@ import RemoteStorage from "./remoteStorage" // DEFINE LOAD ORDER HERE export default [ SettingsCore, + TasksQueue, + APICore, PermissionsCore, StyleCore, diff --git a/packages/app/src/cores/remoteStorage/index.js b/packages/app/src/cores/remoteStorage/index.js index 744fa713..461fdc99 100755 --- a/packages/app/src/cores/remoteStorage/index.js +++ b/packages/app/src/cores/remoteStorage/index.js @@ -2,82 +2,10 @@ import Core from "evite/src/core" export default class RemoteStorage extends Core { static namespace = "remoteStorage" - static depends = ["api"] - - static maxRunningTasks = 3 + static depends = ["api", "tasksQueue"] public = { uploadFile: this.uploadFile.bind(this), - appendToQueue: this.appendToQueue.bind(this), - } - - runningTasksIds = [] - - taskQueue = [] - - processTasks() { - if (this.runningTasksIds.length >= RemoteStorage.maxRunningTasks) { - console.log("We are already running the maximum number of tasks") - return false - } - - // check if there are new tasks in the queue and move them to the tasks array with the maximum number of tasks can be run - if (this.taskQueue.length === 0) { - console.log("No tasks in the queue") - return false - } - - let tasks = this.taskQueue.splice(0, RemoteStorage.maxRunningTasks) - - tasks = tasks.filter((task) => task) - - const promises = tasks.map((task) => { - if (typeof task.fn !== "function") { - throw new Error("Task must be a function") - } - - if (typeof task.index !== "number") { - throw new Error("Task index must be a number") - } - - // add the task to the running tasks array - this.runningTasksIds.push(task.index) - - return task.fn().then((result) => { - // delete the task from the running tasks array - this.runningTasksIds = this.runningTasksIds.filter((tIndex) => tIndex !== task.index) - - return result - }).catch((error) => { - // delete the task from the running tasks array - this.runningTasksIds = this.runningTasksIds.filter((tIndex) => tIndex !== task.index) - - // propagate the error through an exception - throw error - }) - }) - - Promise.all(promises) - .then((res) => { - this.processTasks() - }) - .catch((error) => { - console.error(error) - this.processTasks() - }) - } - - appendToQueue(task) { - if (Array.isArray(task)) { - throw new Error("Task must be a function") - } - - this.taskQueue.unshift({ - index: this.taskQueue.length, - fn: task, - }) - - this.processTasks() } async getFileHash(file) { @@ -158,7 +86,7 @@ export default class RemoteStorage extends Core { } return new Promise((resolve, reject) => { - this.appendToQueue(async () => { + app.cores.tasksQueue.appendToQueue(fileHash, async () => { try { console.log(`Starting upload of file ${file.name}`) console.log("fileHash", fileHash) @@ -192,7 +120,7 @@ export default class RemoteStorage extends Core { resolve() } } - this.appendToQueue(() => this.uploadFile(file, callback)) + app.cores.tasksQueue.appendToQueue(() => this.uploadFile(file, callback)) }) }) diff --git a/packages/app/src/cores/tasksQueue/index.js b/packages/app/src/cores/tasksQueue/index.js new file mode 100644 index 00000000..d5150d73 --- /dev/null +++ b/packages/app/src/cores/tasksQueue/index.js @@ -0,0 +1,91 @@ +import Core from "evite/src/core" +import { Observable } from "object-observer" + +export default class TasksQueue extends Core { + static depends = ["settings"] + + static namespace = "tasksQueue" + + static get maxRunningTasks() { + return app.cores.settings.get("tasks.maxRunningTasks") ?? 3 + } + + public = { + appendToQueue: this.appendToQueue.bind(this), + processTasks: this.processTasks.bind(this), + } + + taskQueue = Observable.from([]) + + runningTasksIds = Observable.from([]) + + processTasks() { + if (this.runningTasksIds.length >= TasksQueue.maxRunningTasks ?? 1) { + console.log("We are already running the maximum number of tasks") + return false + } + + // check if there are new tasks in the queue and move them to the tasks array with the maximum number of tasks can be run + if (this.taskQueue.length === 0) { + console.log("No tasks in the queue") + return false + } + + let tasks = this.taskQueue.splice(0, TasksQueue.maxRunningTasks ?? 1) + + tasks = tasks.filter((task) => task) + + const promises = tasks.map(async (task) => { + if (typeof task.fn !== "function") { + throw new Error("Task must be a function") + } + + if (typeof task.id === "undefined") { + throw new Error("Task id is required") + } + + // add the task to the running tasks array + this.runningTasksIds.push(task.id) + + const taskResult = await task.fn() + .catch((error) => { + // delete the task from the running tasks array + this.runningTasksIds = this.runningTasksIds.filter((runningTaskId) => runningTaskId !== task.id) + + // propagate the error through an exception + throw error + }) + + // delete the task from the running tasks array + this.runningTasksIds = this.runningTasksIds.filter((runningTaskId) => runningTaskId !== task.id) + + return taskResult + }) + + Promise.all(promises) + .then((res) => { + this.processTasks() + }) + .catch((error) => { + console.error(error) + this.processTasks() + }) + } + + appendToQueue(taskId, task) { + if (!taskId) { + throw new Error("Task id is required") + } + + if (Array.isArray(task)) { + throw new Error("Task must be a function") + } + + this.taskQueue.unshift({ + id: taskId, + fn: task, + }) + + this.processTasks() + } +} \ No newline at end of file