diff --git a/client/src/rtengine/index.js b/client/src/rtengine/index.js index 21bbfc7..870b7a9 100644 --- a/client/src/rtengine/index.js +++ b/client/src/rtengine/index.js @@ -1,24 +1,88 @@ import TopicsController from "./topics" +import { version } from "../../package.json" +/** + * WebSocket client for real-time communication with a backend service. + * Provides connection management, automatic reconnection, heartbeat monitoring, + * event handling, and topic-based subscriptions. + */ export class RTEngineClient { + /** + * Creates a new RTEngineClient instance. + * + * @param {Object} [params={}] - Configuration parameters for the client. + * @param {string} [params.refName="default"] - Reference name for this client instance. + * @param {boolean} [params.autoReconnect=true] - Whether to automatically attempt reconnection. + * @param {number} [params.maxConnectRetries=Infinity] - Maximum number of reconnection attempts. + * @param {boolean} [params.heartbeat=true] - Whether to use heartbeat to monitor connection health. + * @param {string} [params.url] - WebSocket server URL to connect to. + * @param {string} [params.token] - Authentication token to include in the connection. + */ constructor(params = {}) { this.params = { - maxConnectRetries: 3, + refName: "default", + autoReconnect: true, + maxConnectRetries: Infinity, + heartbeat: true, ...params, } } + /** @type {string} Client library version */ + static version = version + /** @type {number} Timeout for heartbeat checks in milliseconds */ + static heartbeatTimeout = 10000 + /** @type {number} Delay between reconnection attempts in milliseconds */ + static reconnectTimeout = 5000 + + /** + * Gets the current library version. + * + * @returns {string} Current version string. + */ + get version() { + return this.constructor.version + } + + /** + * Client state object. + * + * @type {Object} + * @property {string|null} id - Client ID assigned by the server. + * @property {boolean} connected - Whether the client is currently connected. + * @property {boolean} authenticated - Whether the client is authenticated. + * @property {number|null} lastPing - Timestamp of the last ping sent. + * @property {number|null} lastPong - Timestamp of the last pong received. + * @property {number|null} latency - Current connection latency in milliseconds. + * @property {boolean} reconnecting - Whether the client is attempting to reconnect. + * @property {number} connectionRetryCount - Number of reconnection attempts made. + */ state = { id: null, connected: false, + authenticated: false, + lastPing: null, + lastPong: null, + latency: null, + reconnecting: false, + connectionRetryCount: 0, } + /** @type {WebSocket|null} Active WebSocket connection */ socket = null + /** @type {Set} Collection of event handlers */ handlers = new Set() + /** @type {TopicsController} Controller for topic-based subscriptions */ topics = new TopicsController(this) + /** + * Establishes a connection to the WebSocket server. + * Automatically disconnects any existing connection first. + * + * @returns {Promise} Promise that resolves when the connection is established. + */ async connect() { if (this.socket) { await this.disconnect() @@ -42,16 +106,32 @@ export class RTEngineClient { }) } + /** + * Closes the current WebSocket connection. + * + * @returns {Promise} Promise resolving to false if no connection exists, true otherwise. + */ async disconnect() { if (!this.socket) { return false } - this.topics.unsubscribeAll() + if (!this.state.reconnecting) { + this.topics.unsubscribeAll() + } + this.socket.close() this.socket = null + + return true } + /** + * Registers an event handler. + * + * @param {string} event - Event name to listen for. + * @param {Function} handler - Function to call when the event is received. + */ on = (event, handler) => { this.handlers.add({ event, @@ -59,6 +139,12 @@ export class RTEngineClient { }) } + /** + * Removes an event handler. + * + * @param {string} event - Event name to stop listening for. + * @param {Function} handler - Handler function to remove. + */ off = (event, handler) => { this.handlers.delete({ event, @@ -66,6 +152,13 @@ export class RTEngineClient { }) } + /** + * Registers a one-time event handler. + * The handler will be automatically removed after the first time it's called. + * + * @param {string} event - Event name to listen for. + * @param {Function} handler - Function to call when the event is received. + */ once = (event, handler) => { this.handlers.add({ event, @@ -74,19 +167,101 @@ export class RTEngineClient { }) } + /** + * Sends an event to the WebSocket server. + * + * @param {string} event - Event name to emit. + * @param {*} data - Data to send with the event. + * @returns {Promise} Promise that resolves when the event is sent, or null if not connected. + */ emit = async (event, data) => { - if (!this.socket) { - throw new Error("Failed to send, socket not connected") + // TODO: implement a msg queue + if (!this.socket || !this.state.connected) { + return null } - return await this.socket.send(JSON.stringify({ event, data })) + return await this.socket.send(this.#_encode({ event, data })) } + /** + * Encodes a payload to JSON string. + * + * @private + * @param {Object} payload - Payload to encode. + * @returns {string} JSON string. + */ + #_encode(payload) { + return JSON.stringify(payload) + } + + /** + * Decodes a JSON string into an object. + * + * @private + * @param {string} payload - JSON string to decode. + * @returns {Object} Decoded object. + */ #_decode(payload) { return JSON.parse(payload) } - //* HANDLERS + /** + * Handles WebSocket open event. + * + * @private + * @param {Event} e - WebSocket open event. + */ + #handleOpen(e) { + if (this.state.reconnecting === true) { + console.log( + `[rt/${this.params.refName}] Connection reconnected at retry [${this.state.connectionRetryCount}]`, + ) + this.#dispatchToHandlers("reconnected") + } + + this.state.connected = true + this.state.connectionRetryCount = 0 + this.state.reconnecting = false + + this.#dispatchToHandlers("open") + + // if heartbeat is enabled, start the heartbeat check + if (this.params.heartbeat === true) { + this.#startHeartbeat() + } + } + + /** + * Handles WebSocket close event. + * + * @private + * @param {CloseEvent} e - WebSocket close event. + */ + #handleClose(e) { + this.state.connected = false + this.#dispatchToHandlers("close") + + if (this.params.autoReconnect === true) { + return this.#tryReconnect() + } + } + + /** + * Handles WebSocket error event. + * + * @private + * @param {Event} error - WebSocket error event. + */ + #handleError(error) { + this.#dispatchToHandlers("error", error) + } + + /** + * Handles WebSocket message event. + * + * @private + * @param {MessageEvent} event - WebSocket message event. + */ #handleMessage(event) { try { const payload = this.#_decode(event.data) @@ -95,40 +270,139 @@ export class RTEngineClient { throw new Error("Invalid event or payload") } - return this.#dispatchToHandlers(payload.event, payload.data) + return this.#dispatchToHandlers( + payload.event, + payload.data, + payload, + ) } catch (error) { - console.error("Error handling message:", error) + console.error( + `[rt/${this.params.refName}] Error handling message:`, + error, + ) } } - #handleClose() { - this.state.connected = false - this.#dispatchToHandlers("disconnect") - } - - #handleOpen() { - this.state.connected = true - this.#dispatchToHandlers("connect") - } - - #handleError(error) { - console.error("WebSocket connection error:", error) - this.#dispatchToHandlers("error") - } - + /** + * Built-in event handlers for common events. + * + * @type {Object} + */ baseHandlers = { + /** + * Handles the 'connected' event. + * + * @param {Object} data - Connection data from server. + */ connected: (data) => { - this.state.connected = true - if (data.id) { this.state.id = data.id + this.state.authenticated = data.authenticated } }, + /** + * Handles the 'reconnected' event. + * + * @param {Object} data - Reconnection data. + */ + reconnected: (data) => { + this.topics.regenerate() + }, + /** + * Handles error events. + * + * @param {Error} error - Error object. + */ error: (error) => { - console.error(error) + console.error( + `[rt/${this.params.refName}] Connection error:`, + error, + ) + }, + /** + * Handles pong responses for heartbeat. + * + * @param {Object} data - Pong data. + */ + pong: (data) => { + this.state.lastPong = performance.now() }, } + /** + * Starts the heartbeat process to monitor connection health. + * + * @private + * @returns {null|void} Null if not connected, void otherwise. + */ #startHeartbeat() { + if (!this.state.connected) { + return null + } + + this.state.lastPong = null + this.state.lastPing = performance.now() + + this.emit("ping").catch(() => null) + + setTimeout(() => { + // if no last pong is received, it means the connection is lost or the latency is too high + if (this.state.lastPong === null) { + this.state.connected = false + + // if max connect retries is more than 0, retry connections + if (this.params.autoReconnect === true) { + return this.#tryReconnect() + } + } + + this.state.latency = Number( + this.state.lastPong - this.state.lastPing, + ).toFixed(2) + + this.#startHeartbeat() + }, this.constructor.heartbeatTimeout) + } + + /** + * Attempts to reconnect to the WebSocket server. + * + * @private + * @returns {null|void} Null if max retries reached, void otherwise. + */ + #tryReconnect() { + // check if retries are left, if so, retry connection + if ( + this.params.maxConnectRetries !== Infinity && + this.state.connectionRetryCount > this.params.maxConnectRetries + ) { + console.error( + `[rt/${this.params.refName}] Reconnection failed: Maximum retries reached [${this.params.maxConnectRetries}]\nClosing socket permanently...`, + ) + this.#dispatchToHandlers("reconnection_failed") + return null + } + + this.state.connectionRetryCount = this.state.connectionRetryCount + 1 + this.state.reconnecting = true + + console.log( + `[rt/${this.params.refName}] Connection timeout, retrying connection in ${this.constructor.reconnectTimeout}ms [${this.state.connectionRetryCount - 1}/${this.params.maxConnectRetries}]`, + ) + + setTimeout(() => { + this.connect() + }, this.constructor.reconnectTimeout) + } + + /** + * Dispatches events to registered handlers. + * + * @private + * @param {string} event - Event name to dispatch. + * @param {*} data - Event data. + * @param {Object} [payload] - Full event payload. + * @returns {Promise} Promise that resolves when all handlers have been called. + */ async #dispatchToHandlers(event, data) { if (this.baseHandlers[event]) { await this.baseHandlers[event](data) diff --git a/client/src/rtengine/topics.js b/client/src/rtengine/topics.js index 3e95ae4..d7e3699 100644 --- a/client/src/rtengine/topics.js +++ b/client/src/rtengine/topics.js @@ -1,27 +1,108 @@ +/** + * Controller for managing topic subscriptions in a real-time client. + * Allows subscribing, unsubscribing, and listening to events associated with specific topics. + */ class TopicsController { + /** + * Creates an instance of the TopicsController. + * + * @param {Object} client - Real-time client that will handle communications. + */ constructor(client) { this.client = client } + /** + * Set that stores the topics currently subscribed to. + * @type {Set} + */ subscribed = new Set() + /** + * Registers a callback for a specific event on a given topic. + * + * @param {string} topic - The topic to associate the event with. + * @param {string} event - Name of the event to listen for. + * @param {Function} callback - Function to execute when the event occurs on the specified topic. + * @param {*} callback.data - Data received from the event. + * @param {Object} callback.payload - Complete event payload, includes topic information. + */ + on = (topic, event, callback) => { + this.client.on(event, (data, payload) => { + if (payload.topic === topic) { + callback(data, payload) + } + }) + } + + /** + * Subscribes to a specific topic. + * + * @param {string} topic - The topic to subscribe to. + * @returns {Promise} - Promise that resolves to true when the subscription is complete. + */ subscribe = async (topic) => { + console.log( + `[rt/${this.client.params.refName}] Subscribing to topic:`, + topic, + ) + await this.client.emit("topic:subscribe", topic) - this.subscribed.add(topic) + + if (!this.subscribed.has(topic)) { + this.subscribed.add(topic) + } return true } + /** + * Unsubscribes from a specific topic. + * + * @param {string} topic - The topic to unsubscribe from. + * @returns {Promise} - Promise that resolves to true when the unsubscription is complete. + */ unsubscribe = async (topic) => { + console.log( + `[rt/${this.client.params.refName}] Unsubscribing from topic:`, + topic, + ) + await this.client.emit("topic:unsubscribe", topic) + this.subscribed.delete(topic) return true } + /** + * Unsubscribes from all currently subscribed topics. + * + * @returns {Promise} - Promise that resolves to true when all unsubscriptions are complete. + */ unsubscribeAll = async () => { for (const topic of this.subscribed) { - await this.leave(topic) + await this.unsubscribe(topic) + } + + return true + } + + /** + * Regenerates all current subscriptions by unsubscribing and resubscribing. + * Useful for updating connections or refreshing the subscription state. + * + * @returns {Promise} - Promise that resolves to true when regeneration is complete. + */ + regenerate = async () => { + console.log( + `[rt/${this.client.params.refName}] Regenerating topics...`, + this.subscribed, + ) + + for (const topic of this.subscribed.values()) { + await this.client.emit("topic:unsubscribe", topic) + await this.client.emit("topic:subscribe", topic) } return true