From fcb4672624b07819d3f992b94e4170163012b16a Mon Sep 17 00:00:00 2001 From: SrGooglo Date: Tue, 25 Mar 2025 22:46:25 +0000 Subject: [PATCH] abstract client --- src/classes/rtengineng/client.js | 47 ++++++++++++++ src/classes/rtengineng/events.js | 8 +++ src/classes/rtengineng/index.js | 106 ++++++++++++++++--------------- 3 files changed, 110 insertions(+), 51 deletions(-) create mode 100644 src/classes/rtengineng/client.js create mode 100644 src/classes/rtengineng/events.js diff --git a/src/classes/rtengineng/client.js b/src/classes/rtengineng/client.js new file mode 100644 index 0000000..3bb6cd2 --- /dev/null +++ b/src/classes/rtengineng/client.js @@ -0,0 +1,47 @@ +class Client { + constructor(socket) { + this.socket = socket + this.id = socket.context.id + + this.userId = socket.context.user?._id || null + this.authed = !!socket.context.session + } + + emit(event, data) { + const payload = JSON.stringify({ event, data }) + + return this.socket.send(payload) + } + + toTopic(topic, event, data, self = false) { + const payload = JSON.stringify({ + topic, + event, + data, + }) + + this.socket.publish(topic, payload) + + if (self === true) { + this.emit(event, data) + } + } + + error(error) { + if (error instanceof Error) { + error = error.toString() + } + + return this.emit("error", error) + } + + subscribe(topic) { + return this.socket.subscribe(topic) + } + + unsubscribe(topic) { + return this.socket.unsubscribe(topic) + } +} + +export default Client diff --git a/src/classes/rtengineng/events.js b/src/classes/rtengineng/events.js new file mode 100644 index 0000000..6dd94ad --- /dev/null +++ b/src/classes/rtengineng/events.js @@ -0,0 +1,8 @@ +export default { + "topic:join": async (client, topic) => { + client.subscribe(topic) + }, + "topic:leave": async (client, topic) => { + client.unsubscribe(topic) + }, +} diff --git a/src/classes/rtengineng/index.js b/src/classes/rtengineng/index.js index 96171fa..46b06a4 100644 --- a/src/classes/rtengineng/index.js +++ b/src/classes/rtengineng/index.js @@ -1,5 +1,8 @@ import HyperExpress from "hyper-express" +import Client from "./client" +import BuiltInEvents from "./events" + class RTEngineNG { constructor(config = {}) { this.events = new Map() @@ -10,85 +13,84 @@ class RTEngineNG { } } + for (const [event, handler] of Object.entries(BuiltInEvents)) { + this.events.set(event, handler) + } + this.onUpgrade = config.onUpgrade || null this.onConnection = config.onConnection || null - this.onDisconnection = config.onDisconnection || null + this.onDisconnect = config.onDisconnect || null } - clients = new Set() + clients = new Map() router = new HyperExpress.Router() senders = { broadcast: async (event, data) => { - for (const client of this.clients) { - this.sendMessage(client, event, data) + for (const [socketId, client] of this.clients) { + client.emit(event, data) } }, + toTopic: async (topic, event, data) => { + if (!this.engine) { + throw new Error("Engine not initialized") + } + + return this.engine.app.publish( + topic, + JSON.stringify({ + topic: topic, + event: event, + data: data, + }), + ) + }, } - sendMessage = (socket, event, data) => { - const payload = JSON.stringify({ event, data }) + find = { + clientsByUserId: (userId) => { + const clients = [] - socket.send(payload) - } + for (const [socketId, client] of this.clients) { + if (client.userId === userId) { + clients.push(client) + } + } - sendToTopic = (socket, topic, event, data, self = false) => { - const payload = JSON.stringify({ - topic, - event, - data, - }) - - socket.publish(topic, payload) - - if (self === true) { - this.sendMessage(socket, event, data) - } - } - - sendError = (socket, error) => { - if (error instanceof Error) { - error = error.toString() - } - - this.sendMessage(socket, "error", error) + return clients + }, } handleMessage = async (socket, payload) => { + const client = this.clients.get(socket.context.id) + + if (!client) { + return socket.send( + JSON.stringify({ event: "error", data: "Client not found" }), + ) + } + let message = null try { message = JSON.parse(payload) if (typeof message.event !== "string") { - return this.sendError(socket, "Invalid event type") + return client.error("Invalid event type") } const handler = this.events.get(message.event) if (typeof handler === "function") { - const handlerSenders = { - ...this.senders, - toTopic: (room, event, data, self) => { - this.sendToTopic(socket, room, event, data, self) - }, - send: (event, data) => { - this.sendMessage(socket, event, data) - }, - error: (error) => { - this.sendError(socket, error) - }, - } - - await handler(socket, message.data, handlerSenders) + await handler(client, message.data) } else { console.log(`[ws] 404 /${message.event}`) - this.sendError(socket, "Event handler not found") + client.error("Event handler not found") } } catch (error) { console.log(`[ws] 500 /${message?.event ?? "unknown"} >`, error) - this.sendError(socket, error) + client.error(error) } } @@ -98,17 +100,19 @@ class RTEngineNG { } socket.on("message", (payload) => this.handleMessage(socket, payload)) - socket.on("close", () => this.handleDisconnection(socket)) + socket.on("close", () => this.handleDisconnect(socket)) - this.clients.add(socket) + const client = new Client(socket) + + this.clients.set(socket.context.id, client) } - handleDisconnection = async (socket) => { - if (this.onDisconnection) { - await this.onDisconnection(socket) + handleDisconnect = async (socket) => { + if (typeof this.onDisconnect === "function") { + await this.onDisconnect(socket) } - this.clients.delete(socket) + this.clients.delete(socket.context.id) } handleUpgrade = async (req, res) => {