diff --git a/client/src/rtengine/index.js b/client/src/rtengine/index.js index 870b7a9..6bc1078 100755 --- a/client/src/rtengine/index.js +++ b/client/src/rtengine/index.js @@ -1,5 +1,6 @@ import TopicsController from "./topics" import { version } from "../../package.json" +import path from "node:path" /** * WebSocket client for real-time communication with a backend service. @@ -28,6 +29,8 @@ export class RTEngineClient { } } + abortController = new AbortController() + /** @type {string} Client library version */ static version = version /** @type {number} Timeout for heartbeat checks in milliseconds */ @@ -84,6 +87,10 @@ export class RTEngineClient { * @returns {Promise} Promise that resolves when the connection is established. */ async connect() { + if (this.abortController.signal.aborted) { + return null + } + if (this.socket) { await this.disconnect() } @@ -126,6 +133,27 @@ export class RTEngineClient { return true } + /** + * Permanently destroys the client connection, closes the socket, + * cancels any pending reconnection attempts, and prevents further reconnection. + * + * @returns {Promise} A promise that resolves when the connection is destroyed. + */ + async destroy() { + console.log(`[rt] Destroying connection`) + + if (this.socket) { + this.socket.close() + this.socket = null + } + + // cancel reconnecttions if any + this.state.reconnecting = false + this.state.reconnectAttempts = 0 + + this.abortController.abort() + } + /** * Registers an event handler. * @@ -183,6 +211,30 @@ export class RTEngineClient { return await this.socket.send(this.#_encode({ event, data })) } + /** + * Sends an event to the WebSocket server and returns a message from the server. + * + * @param {string} event - Event name to emit. + * @param {*} data - Data to send with the event. + * @returns {Promise} Promise that resolves with the message from the server. + */ + call = (event, data) => { + return new Promise((resolve, reject) => { + this.once(`ack_${event}`, (...args) => { + resolve(...args) + }) + + this.socket.send(this.#_encode({ event, data, ack: true })) + }) + } + + /** + * Removes all event listeners registered. + */ + removeAllListeners = () => { + this.handlers.clear() + } + /** * Encodes a payload to JSON string. * @@ -270,6 +322,8 @@ export class RTEngineClient { throw new Error("Invalid event or payload") } + this.#dispatchToHandlers("message", payload.data, payload) + return this.#dispatchToHandlers( payload.event, payload.data, @@ -314,10 +368,7 @@ export class RTEngineClient { * @param {Error} error - Error object. */ error: (error) => { - console.error( - `[rt/${this.params.refName}] Connection error:`, - error, - ) + console.error(`[rt/${this.params.refName}] error:`, error) }, /** * Handles pong responses for heartbeat. @@ -370,6 +421,10 @@ export class RTEngineClient { * @returns {null|void} Null if max retries reached, void otherwise. */ #tryReconnect() { + if (this.abortController.signal.aborted) { + return null + } + // check if retries are left, if so, retry connection if ( this.params.maxConnectRetries !== Infinity && @@ -403,14 +458,14 @@ export class RTEngineClient { * @param {Object} [payload] - Full event payload. * @returns {Promise} Promise that resolves when all handlers have been called. */ - async #dispatchToHandlers(event, data) { + async #dispatchToHandlers(event, ...args) { if (this.baseHandlers[event]) { - await this.baseHandlers[event](data) + await this.baseHandlers[event](...args) } for (const handler of this.handlers) { if (handler.event === event) { - handler.handler(data) + handler.handler(...args) if (handler.once === true) { this.handlers.delete(handler)