implemented call & destroy

This commit is contained in:
SrGooglo 2025-04-09 20:40:02 +00:00
parent de6514e5ee
commit 920b3e3c08

View File

@ -1,5 +1,6 @@
import TopicsController from "./topics" import TopicsController from "./topics"
import { version } from "../../package.json" import { version } from "../../package.json"
import path from "node:path"
/** /**
* WebSocket client for real-time communication with a backend service. * WebSocket client for real-time communication with a backend service.
@ -28,6 +29,8 @@ export class RTEngineClient {
} }
} }
abortController = new AbortController()
/** @type {string} Client library version */ /** @type {string} Client library version */
static version = version static version = version
/** @type {number} Timeout for heartbeat checks in milliseconds */ /** @type {number} Timeout for heartbeat checks in milliseconds */
@ -84,6 +87,10 @@ export class RTEngineClient {
* @returns {Promise<void>} Promise that resolves when the connection is established. * @returns {Promise<void>} Promise that resolves when the connection is established.
*/ */
async connect() { async connect() {
if (this.abortController.signal.aborted) {
return null
}
if (this.socket) { if (this.socket) {
await this.disconnect() await this.disconnect()
} }
@ -126,6 +133,27 @@ export class RTEngineClient {
return true return true
} }
/**
* Permanently destroys the client connection, closes the socket,
* cancels any pending reconnection attempts, and prevents further reconnection.
*
* @returns {Promise<void>} A promise that resolves when the connection is destroyed.
*/
async destroy() {
console.log(`[rt] Destroying connection`)
if (this.socket) {
this.socket.close()
this.socket = null
}
// cancel reconnecttions if any
this.state.reconnecting = false
this.state.reconnectAttempts = 0
this.abortController.abort()
}
/** /**
* Registers an event handler. * Registers an event handler.
* *
@ -183,6 +211,30 @@ export class RTEngineClient {
return await this.socket.send(this.#_encode({ event, data })) return await this.socket.send(this.#_encode({ event, data }))
} }
/**
* Sends an event to the WebSocket server and returns a message from the server.
*
* @param {string} event - Event name to emit.
* @param {*} data - Data to send with the event.
* @returns {Promise<object|string>} Promise that resolves with the message from the server.
*/
call = (event, data) => {
return new Promise((resolve, reject) => {
this.once(`ack_${event}`, (...args) => {
resolve(...args)
})
this.socket.send(this.#_encode({ event, data, ack: true }))
})
}
/**
* Removes all event listeners registered.
*/
removeAllListeners = () => {
this.handlers.clear()
}
/** /**
* Encodes a payload to JSON string. * Encodes a payload to JSON string.
* *
@ -270,6 +322,8 @@ export class RTEngineClient {
throw new Error("Invalid event or payload") throw new Error("Invalid event or payload")
} }
this.#dispatchToHandlers("message", payload.data, payload)
return this.#dispatchToHandlers( return this.#dispatchToHandlers(
payload.event, payload.event,
payload.data, payload.data,
@ -314,10 +368,7 @@ export class RTEngineClient {
* @param {Error} error - Error object. * @param {Error} error - Error object.
*/ */
error: (error) => { error: (error) => {
console.error( console.error(`[rt/${this.params.refName}] error:`, error)
`[rt/${this.params.refName}] Connection error:`,
error,
)
}, },
/** /**
* Handles pong responses for heartbeat. * Handles pong responses for heartbeat.
@ -370,6 +421,10 @@ export class RTEngineClient {
* @returns {null|void} Null if max retries reached, void otherwise. * @returns {null|void} Null if max retries reached, void otherwise.
*/ */
#tryReconnect() { #tryReconnect() {
if (this.abortController.signal.aborted) {
return null
}
// check if retries are left, if so, retry connection // check if retries are left, if so, retry connection
if ( if (
this.params.maxConnectRetries !== Infinity && this.params.maxConnectRetries !== Infinity &&
@ -403,14 +458,14 @@ export class RTEngineClient {
* @param {Object} [payload] - Full event payload. * @param {Object} [payload] - Full event payload.
* @returns {Promise<void>} Promise that resolves when all handlers have been called. * @returns {Promise<void>} Promise that resolves when all handlers have been called.
*/ */
async #dispatchToHandlers(event, data) { async #dispatchToHandlers(event, ...args) {
if (this.baseHandlers[event]) { if (this.baseHandlers[event]) {
await this.baseHandlers[event](data) await this.baseHandlers[event](...args)
} }
for (const handler of this.handlers) { for (const handler of this.handlers) {
if (handler.event === event) { if (handler.event === event) {
handler.handler(data) handler.handler(...args)
if (handler.once === true) { if (handler.once === true) {
this.handlers.delete(handler) this.handlers.delete(handler)