improve reconnection

This commit is contained in:
SrGooglo 2025-03-26 13:47:13 +00:00
parent 174c00af87
commit 4dd94c388a
2 changed files with 383 additions and 28 deletions

View File

@ -1,24 +1,88 @@
import TopicsController from "./topics"
import { version } from "../../package.json"
/**
* WebSocket client for real-time communication with a backend service.
* Provides connection management, automatic reconnection, heartbeat monitoring,
* event handling, and topic-based subscriptions.
*/
export class RTEngineClient {
/**
* Creates a new RTEngineClient instance.
*
* @param {Object} [params={}] - Configuration parameters for the client.
* @param {string} [params.refName="default"] - Reference name for this client instance.
* @param {boolean} [params.autoReconnect=true] - Whether to automatically attempt reconnection.
* @param {number} [params.maxConnectRetries=Infinity] - Maximum number of reconnection attempts.
* @param {boolean} [params.heartbeat=true] - Whether to use heartbeat to monitor connection health.
* @param {string} [params.url] - WebSocket server URL to connect to.
* @param {string} [params.token] - Authentication token to include in the connection.
*/
constructor(params = {}) {
this.params = {
maxConnectRetries: 3,
refName: "default",
autoReconnect: true,
maxConnectRetries: Infinity,
heartbeat: true,
...params,
}
}
/** @type {string} Client library version */
static version = version
/** @type {number} Timeout for heartbeat checks in milliseconds */
static heartbeatTimeout = 10000
/** @type {number} Delay between reconnection attempts in milliseconds */
static reconnectTimeout = 5000
/**
* Gets the current library version.
*
* @returns {string} Current version string.
*/
get version() {
return this.constructor.version
}
/**
* Client state object.
*
* @type {Object}
* @property {string|null} id - Client ID assigned by the server.
* @property {boolean} connected - Whether the client is currently connected.
* @property {boolean} authenticated - Whether the client is authenticated.
* @property {number|null} lastPing - Timestamp of the last ping sent.
* @property {number|null} lastPong - Timestamp of the last pong received.
* @property {number|null} latency - Current connection latency in milliseconds.
* @property {boolean} reconnecting - Whether the client is attempting to reconnect.
* @property {number} connectionRetryCount - Number of reconnection attempts made.
*/
state = {
id: null,
connected: false,
authenticated: false,
lastPing: null,
lastPong: null,
latency: null,
reconnecting: false,
connectionRetryCount: 0,
}
/** @type {WebSocket|null} Active WebSocket connection */
socket = null
/** @type {Set} Collection of event handlers */
handlers = new Set()
/** @type {TopicsController} Controller for topic-based subscriptions */
topics = new TopicsController(this)
/**
* Establishes a connection to the WebSocket server.
* Automatically disconnects any existing connection first.
*
* @returns {Promise<void>} Promise that resolves when the connection is established.
*/
async connect() {
if (this.socket) {
await this.disconnect()
@ -42,16 +106,32 @@ export class RTEngineClient {
})
}
/**
* Closes the current WebSocket connection.
*
* @returns {Promise<boolean>} Promise resolving to false if no connection exists, true otherwise.
*/
async disconnect() {
if (!this.socket) {
return false
}
this.topics.unsubscribeAll()
if (!this.state.reconnecting) {
this.topics.unsubscribeAll()
}
this.socket.close()
this.socket = null
return true
}
/**
* Registers an event handler.
*
* @param {string} event - Event name to listen for.
* @param {Function} handler - Function to call when the event is received.
*/
on = (event, handler) => {
this.handlers.add({
event,
@ -59,6 +139,12 @@ export class RTEngineClient {
})
}
/**
* Removes an event handler.
*
* @param {string} event - Event name to stop listening for.
* @param {Function} handler - Handler function to remove.
*/
off = (event, handler) => {
this.handlers.delete({
event,
@ -66,6 +152,13 @@ export class RTEngineClient {
})
}
/**
* Registers a one-time event handler.
* The handler will be automatically removed after the first time it's called.
*
* @param {string} event - Event name to listen for.
* @param {Function} handler - Function to call when the event is received.
*/
once = (event, handler) => {
this.handlers.add({
event,
@ -74,19 +167,101 @@ export class RTEngineClient {
})
}
/**
* Sends an event to the WebSocket server.
*
* @param {string} event - Event name to emit.
* @param {*} data - Data to send with the event.
* @returns {Promise<null|void>} Promise that resolves when the event is sent, or null if not connected.
*/
emit = async (event, data) => {
if (!this.socket) {
throw new Error("Failed to send, socket not connected")
// TODO: implement a msg queue
if (!this.socket || !this.state.connected) {
return null
}
return await this.socket.send(JSON.stringify({ event, data }))
return await this.socket.send(this.#_encode({ event, data }))
}
/**
* Encodes a payload to JSON string.
*
* @private
* @param {Object} payload - Payload to encode.
* @returns {string} JSON string.
*/
#_encode(payload) {
return JSON.stringify(payload)
}
/**
* Decodes a JSON string into an object.
*
* @private
* @param {string} payload - JSON string to decode.
* @returns {Object} Decoded object.
*/
#_decode(payload) {
return JSON.parse(payload)
}
//* HANDLERS
/**
* Handles WebSocket open event.
*
* @private
* @param {Event} e - WebSocket open event.
*/
#handleOpen(e) {
if (this.state.reconnecting === true) {
console.log(
`[rt/${this.params.refName}] Connection reconnected at retry [${this.state.connectionRetryCount}]`,
)
this.#dispatchToHandlers("reconnected")
}
this.state.connected = true
this.state.connectionRetryCount = 0
this.state.reconnecting = false
this.#dispatchToHandlers("open")
// if heartbeat is enabled, start the heartbeat check
if (this.params.heartbeat === true) {
this.#startHeartbeat()
}
}
/**
* Handles WebSocket close event.
*
* @private
* @param {CloseEvent} e - WebSocket close event.
*/
#handleClose(e) {
this.state.connected = false
this.#dispatchToHandlers("close")
if (this.params.autoReconnect === true) {
return this.#tryReconnect()
}
}
/**
* Handles WebSocket error event.
*
* @private
* @param {Event} error - WebSocket error event.
*/
#handleError(error) {
this.#dispatchToHandlers("error", error)
}
/**
* Handles WebSocket message event.
*
* @private
* @param {MessageEvent} event - WebSocket message event.
*/
#handleMessage(event) {
try {
const payload = this.#_decode(event.data)
@ -95,40 +270,139 @@ export class RTEngineClient {
throw new Error("Invalid event or payload")
}
return this.#dispatchToHandlers(payload.event, payload.data)
return this.#dispatchToHandlers(
payload.event,
payload.data,
payload,
)
} catch (error) {
console.error("Error handling message:", error)
console.error(
`[rt/${this.params.refName}] Error handling message:`,
error,
)
}
}
#handleClose() {
this.state.connected = false
this.#dispatchToHandlers("disconnect")
}
#handleOpen() {
this.state.connected = true
this.#dispatchToHandlers("connect")
}
#handleError(error) {
console.error("WebSocket connection error:", error)
this.#dispatchToHandlers("error")
}
/**
* Built-in event handlers for common events.
*
* @type {Object}
*/
baseHandlers = {
/**
* Handles the 'connected' event.
*
* @param {Object} data - Connection data from server.
*/
connected: (data) => {
this.state.connected = true
if (data.id) {
this.state.id = data.id
this.state.authenticated = data.authenticated
}
},
/**
* Handles the 'reconnected' event.
*
* @param {Object} data - Reconnection data.
*/
reconnected: (data) => {
this.topics.regenerate()
},
/**
* Handles error events.
*
* @param {Error} error - Error object.
*/
error: (error) => {
console.error(error)
console.error(
`[rt/${this.params.refName}] Connection error:`,
error,
)
},
/**
* Handles pong responses for heartbeat.
*
* @param {Object} data - Pong data.
*/
pong: (data) => {
this.state.lastPong = performance.now()
},
}
/**
* Starts the heartbeat process to monitor connection health.
*
* @private
* @returns {null|void} Null if not connected, void otherwise.
*/ #startHeartbeat() {
if (!this.state.connected) {
return null
}
this.state.lastPong = null
this.state.lastPing = performance.now()
this.emit("ping").catch(() => null)
setTimeout(() => {
// if no last pong is received, it means the connection is lost or the latency is too high
if (this.state.lastPong === null) {
this.state.connected = false
// if max connect retries is more than 0, retry connections
if (this.params.autoReconnect === true) {
return this.#tryReconnect()
}
}
this.state.latency = Number(
this.state.lastPong - this.state.lastPing,
).toFixed(2)
this.#startHeartbeat()
}, this.constructor.heartbeatTimeout)
}
/**
* Attempts to reconnect to the WebSocket server.
*
* @private
* @returns {null|void} Null if max retries reached, void otherwise.
*/
#tryReconnect() {
// check if retries are left, if so, retry connection
if (
this.params.maxConnectRetries !== Infinity &&
this.state.connectionRetryCount > this.params.maxConnectRetries
) {
console.error(
`[rt/${this.params.refName}] Reconnection failed: Maximum retries reached [${this.params.maxConnectRetries}]\nClosing socket permanently...`,
)
this.#dispatchToHandlers("reconnection_failed")
return null
}
this.state.connectionRetryCount = this.state.connectionRetryCount + 1
this.state.reconnecting = true
console.log(
`[rt/${this.params.refName}] Connection timeout, retrying connection in ${this.constructor.reconnectTimeout}ms [${this.state.connectionRetryCount - 1}/${this.params.maxConnectRetries}]`,
)
setTimeout(() => {
this.connect()
}, this.constructor.reconnectTimeout)
}
/**
* Dispatches events to registered handlers.
*
* @private
* @param {string} event - Event name to dispatch.
* @param {*} data - Event data.
* @param {Object} [payload] - Full event payload.
* @returns {Promise<void>} Promise that resolves when all handlers have been called.
*/
async #dispatchToHandlers(event, data) {
if (this.baseHandlers[event]) {
await this.baseHandlers[event](data)

View File

@ -1,27 +1,108 @@
/**
* Controller for managing topic subscriptions in a real-time client.
* Allows subscribing, unsubscribing, and listening to events associated with specific topics.
*/
class TopicsController {
/**
* Creates an instance of the TopicsController.
*
* @param {Object} client - Real-time client that will handle communications.
*/
constructor(client) {
this.client = client
}
/**
* Set that stores the topics currently subscribed to.
* @type {Set<string>}
*/
subscribed = new Set()
/**
* Registers a callback for a specific event on a given topic.
*
* @param {string} topic - The topic to associate the event with.
* @param {string} event - Name of the event to listen for.
* @param {Function} callback - Function to execute when the event occurs on the specified topic.
* @param {*} callback.data - Data received from the event.
* @param {Object} callback.payload - Complete event payload, includes topic information.
*/
on = (topic, event, callback) => {
this.client.on(event, (data, payload) => {
if (payload.topic === topic) {
callback(data, payload)
}
})
}
/**
* Subscribes to a specific topic.
*
* @param {string} topic - The topic to subscribe to.
* @returns {Promise<boolean>} - Promise that resolves to true when the subscription is complete.
*/
subscribe = async (topic) => {
console.log(
`[rt/${this.client.params.refName}] Subscribing to topic:`,
topic,
)
await this.client.emit("topic:subscribe", topic)
this.subscribed.add(topic)
if (!this.subscribed.has(topic)) {
this.subscribed.add(topic)
}
return true
}
/**
* Unsubscribes from a specific topic.
*
* @param {string} topic - The topic to unsubscribe from.
* @returns {Promise<boolean>} - Promise that resolves to true when the unsubscription is complete.
*/
unsubscribe = async (topic) => {
console.log(
`[rt/${this.client.params.refName}] Unsubscribing from topic:`,
topic,
)
await this.client.emit("topic:unsubscribe", topic)
this.subscribed.delete(topic)
return true
}
/**
* Unsubscribes from all currently subscribed topics.
*
* @returns {Promise<boolean>} - Promise that resolves to true when all unsubscriptions are complete.
*/
unsubscribeAll = async () => {
for (const topic of this.subscribed) {
await this.leave(topic)
await this.unsubscribe(topic)
}
return true
}
/**
* Regenerates all current subscriptions by unsubscribing and resubscribing.
* Useful for updating connections or refreshing the subscription state.
*
* @returns {Promise<boolean>} - Promise that resolves to true when regeneration is complete.
*/
regenerate = async () => {
console.log(
`[rt/${this.client.params.refName}] Regenerating topics...`,
this.subscribed,
)
for (const topic of this.subscribed.values()) {
await this.client.emit("topic:unsubscribe", topic)
await this.client.emit("topic:subscribe", topic)
}
return true