From 048040bd7f36f4b17fae0cf6e36c796d525b17f1 Mon Sep 17 00:00:00 2001 From: SrGooglo Date: Tue, 25 Mar 2025 19:04:00 +0000 Subject: [PATCH] support new experimental ws server --- src/classes/rtengineng/index.js | 158 ++++++++++++++++++ src/engines/hyper-express-ng/index.js | 149 +++++++++++++++++ src/engines/index.js | 9 + .../registerWebsocketsEvents/index.js | 46 +++-- src/server.js | 21 +-- src/utils/flatRouteredFunctions.js | 23 +++ src/utils/getRouteredFunctions.js | 24 +++ 7 files changed, 391 insertions(+), 39 deletions(-) create mode 100644 src/classes/rtengineng/index.js create mode 100644 src/engines/hyper-express-ng/index.js create mode 100644 src/engines/index.js create mode 100644 src/utils/flatRouteredFunctions.js create mode 100644 src/utils/getRouteredFunctions.js diff --git a/src/classes/rtengineng/index.js b/src/classes/rtengineng/index.js new file mode 100644 index 0000000..96171fa --- /dev/null +++ b/src/classes/rtengineng/index.js @@ -0,0 +1,158 @@ +import HyperExpress from "hyper-express" + +class RTEngineNG { + constructor(config = {}) { + this.events = new Map() + + if (typeof config.events === "object") { + for (const [event, handler] of Object.entries(config.events)) { + this.events.set(event, handler) + } + } + + this.onUpgrade = config.onUpgrade || null + this.onConnection = config.onConnection || null + this.onDisconnection = config.onDisconnection || null + } + + clients = new Set() + + router = new HyperExpress.Router() + + senders = { + broadcast: async (event, data) => { + for (const client of this.clients) { + this.sendMessage(client, event, data) + } + }, + } + + sendMessage = (socket, event, data) => { + const payload = JSON.stringify({ event, data }) + + socket.send(payload) + } + + sendToTopic = (socket, topic, event, data, self = false) => { + const payload = JSON.stringify({ + topic, + event, + data, + }) + + socket.publish(topic, payload) + + if (self === true) { + this.sendMessage(socket, event, data) + } + } + + sendError = (socket, error) => { + if (error instanceof Error) { + error = error.toString() + } + + this.sendMessage(socket, "error", error) + } + + handleMessage = async (socket, payload) => { + let message = null + + try { + message = JSON.parse(payload) + + if (typeof message.event !== "string") { + return this.sendError(socket, "Invalid event type") + } + + const handler = this.events.get(message.event) + + if (typeof handler === "function") { + const handlerSenders = { + ...this.senders, + toTopic: (room, event, data, self) => { + this.sendToTopic(socket, room, event, data, self) + }, + send: (event, data) => { + this.sendMessage(socket, event, data) + }, + error: (error) => { + this.sendError(socket, error) + }, + } + + await handler(socket, message.data, handlerSenders) + } else { + console.log(`[ws] 404 /${message.event}`) + this.sendError(socket, "Event handler not found") + } + } catch (error) { + console.log(`[ws] 500 /${message?.event ?? "unknown"} >`, error) + this.sendError(socket, error) + } + } + + handleConnection = async (socket) => { + if (this.onConnection) { + await this.onConnection(socket) + } + + socket.on("message", (payload) => this.handleMessage(socket, payload)) + socket.on("close", () => this.handleDisconnection(socket)) + + this.clients.add(socket) + } + + handleDisconnection = async (socket) => { + if (this.onDisconnection) { + await this.onDisconnection(socket) + } + + this.clients.delete(socket) + } + + handleUpgrade = async (req, res) => { + try { + const context = { + id: nanoid(), + token: req.query.token, + user: null, + httpHeaders: req.headers, + } + + if (typeof this.onUpgrade === "function") { + await this.onUpgrade(context, req.query.token, res) + } else { + res.upgrade(context) + } + } catch (error) { + console.error("Error upgrading connection:", error) + res.status(401).end() + } + } + + registerEvent = (event, handler) => { + this.events.set(event, handler) + } + + registerEvents = (obj) => { + for (const [event, handler] of Object.entries(obj)) { + this.registerEvent(event, handler) + } + } + + attach = async (engine) => { + this.engine = engine + + this.router.ws("/", this.handleConnection) + this.router.upgrade("/", this.handleUpgrade) + + this.engine.app.use("/", this.router) + } + + close = () => { + // nothing to do, yet... + } +} + +export default RTEngineNG diff --git a/src/engines/hyper-express-ng/index.js b/src/engines/hyper-express-ng/index.js new file mode 100644 index 0000000..8a97544 --- /dev/null +++ b/src/engines/hyper-express-ng/index.js @@ -0,0 +1,149 @@ +import he from "hyper-express" +import rtengineng from "../../classes/rtengineng" + +import getRouteredFunctions from "../../utils/getRouteredFunctions" +import flatRouteredFunctions from "../../utils/flatRouteredFunctions" + +export default class HyperExpressEngineNG { + constructor(params, ctx) { + this.params = params + this.ctx = ctx + } + + app = null + ws = null + router = null + + initialize = async (params) => { + console.warn( + `hyper-express-ng is a experimental engine, some features may not be available or work properly!`, + ) + + const serverParams = { + max_body_length: 50 * 1024 * 1024, //50MB in bytes, + } + + if (params.ssl) { + serverParams.key_file_name = params.ssl?.key ?? null + serverParams.cert_file_name = params.ssl?.cert ?? null + } + + this.app = new he.Server(serverParams) + + this.router = new he.Router() + + // create a router map + if (typeof this.router.map !== "object") { + this.router.map = {} + } + + await this.router.any("*", (req, res) => { + return res.status(404).json({ + code: 404, + message: "Not found", + }) + }) + + await this.app.use(async (req, res, next) => { + if (req.method === "OPTIONS") { + // handle cors + if (params.ignoreCors) { + res.setHeader("Access-Control-Allow-Methods", "*") + res.setHeader("Access-Control-Allow-Origin", "*") + res.setHeader("Access-Control-Allow-Headers", "*") + } + + return res.status(204).end() + } + + // register body parser + if (req.headers["content-type"]) { + if ( + !req.headers["content-type"].startsWith( + "multipart/form-data", + ) + ) { + req.body = await req.urlencoded() + req.body = await req.json(req.body) + } + } + }) + + if (params.enableWebsockets) { + this.ws = new rtengineng({ + onUpgrade: params.handleWsUpgrade, + onConnection: params.handleWsConnection, + onDisconnect: params.handleWsDisconnect, + }) + + await this.ws.attach(this) + } + } + + listen = async (params) => { + if (process.env.lb_service) { + let pathOverrides = Object.keys(this.router.map).map((key) => { + return key.split("/")[1] + }) + + // remove duplicates + pathOverrides = [...new Set(pathOverrides)] + + // remove "" and _map + pathOverrides = pathOverrides.filter((key) => { + if (key === "" || key === "_map") { + return false + } + + return true + }) + + if (params.enableWebsockets) { + process.send({ + type: "router:ws:register", + id: process.env.lb_service.id, + index: process.env.lb_service.index, + data: { + namespace: params.refName, + listen_port: this.params.listen_port, + ws_path: params.wsPath ?? "/", + }, + }) + } + + if (process.send) { + // try to send router map to host + process.send({ + type: "router:register", + id: process.env.lb_service.id, + index: process.env.lb_service.index, + data: { + router_map: this.router.map, + path_overrides: pathOverrides, + listen: { + ip: this.params.listen_ip, + port: this.params.listen_port, + }, + }, + }) + } + } + + await this.app.listen(this.params.listen_port) + } + + // close must be synchronous + close = () => { + if (this.ws && typeof this.ws.close === "function") { + this.ws.close() + } + + if (typeof this.app.close === "function") { + this.app.close() + } + + if (typeof this.ctx.onClose === "function") { + this.ctx.onClose() + } + } +} diff --git a/src/engines/index.js b/src/engines/index.js new file mode 100644 index 0000000..0bf2110 --- /dev/null +++ b/src/engines/index.js @@ -0,0 +1,9 @@ +import HyperExpress from "./hyper-express" +import HyperExpressNG from "./hyper-express-ng" +import Worker from "./worker" + +export default { + "hyper-express": HyperExpress, + "hyper-express-ng": HyperExpressNG, + worker: Worker, +} diff --git a/src/initializators/registerWebsocketsEvents/index.js b/src/initializators/registerWebsocketsEvents/index.js index 963d78c..77e36c9 100644 --- a/src/initializators/registerWebsocketsEvents/index.js +++ b/src/initializators/registerWebsocketsEvents/index.js @@ -1,36 +1,28 @@ import fs from "node:fs" -import RecursiveRegister from "../../lib/recursiveRegister" +import getRouteredFunctions from "../../utils/getRouteredFunctions" +import flatRouteredFunctions from "../../utils/flatRouteredFunctions" export default async (startDir, engine) => { - if (!engine.ws) { - return engine - } + if (!engine.ws || !fs.existsSync(startDir)) { + return engine + } - if (!fs.existsSync(startDir)) { - return engine - } + let events = await getRouteredFunctions(startDir) - await RecursiveRegister({ - start: startDir, - match: async (filePath) => { - return filePath.endsWith(".js") || filePath.endsWith(".ts") - }, - onMatch: async ({ absolutePath, relativePath }) => { - let eventName = relativePath.split("/").join(":") + events = flatRouteredFunctions(events) - eventName = eventName.replace(".js", "") - eventName = eventName.replace(".ts", "") + if (typeof events !== "object") { + return engine + } - let fn = require(absolutePath) + if (typeof engine.ws.registerEvents === "function") { + await engine.ws.registerEvents(events) + } else { + for (const eventKey of Object.keys(events)) { + engine.ws.events.set(eventKey, events[eventKey]) + } + } - fn = fn.default ?? fn - - console.log(`[WEBSOCKET] register event : ${eventName} >`, fn) - - engine.ws.events.set(eventName, fn) - } - }) - - return engine -} \ No newline at end of file + return engine +} diff --git a/src/server.js b/src/server.js index ca46e24..37bad9d 100755 --- a/src/server.js +++ b/src/server.js @@ -13,17 +13,7 @@ import registerBaseEndpoints from "./initializators/registerBaseEndpoints" import registerWebsocketsEvents from "./initializators/registerWebsocketsEvents" import registerHttpRoutes from "./initializators/registerHttpRoutes" -async function loadEngine(engine) { - const enginesPath = path.resolve(__dirname, "engines") - - const selectedEnginePath = path.resolve(enginesPath, engine) - - if (!fs.existsSync(selectedEnginePath)) { - throw new Error(`Engine ${engine} not found!`) - } - - return require(selectedEnginePath).default -} +import Engines from "./engines" class Server { constructor(params = {}, controllers = {}, middlewares = {}, headers = {}) { @@ -145,6 +135,9 @@ class Server { const engineParams = { ...this.params, + handleWsUpgrade: this.handleWsUpgrade, + handleWsConnection: this.handleWsConnection, + handleWsDisconnect: this.handleWsDisconnect, handleWsAuth: this.handleWsAuth, handleAuth: this.handleHttpAuth, requireAuth: this.constructor.requireHttpAuth, @@ -153,7 +146,11 @@ class Server { } // initialize engine - this.engine = await loadEngine(this.params.useEngine) + this.engine = Engines[this.params.useEngine] + + if (!this.engine) { + throw new Error(`Engine ${this.params.useEngine} not found`) + } this.engine = new this.engine(engineParams, this) diff --git a/src/utils/flatRouteredFunctions.js b/src/utils/flatRouteredFunctions.js new file mode 100644 index 0000000..80f55fb --- /dev/null +++ b/src/utils/flatRouteredFunctions.js @@ -0,0 +1,23 @@ +// convert routered functions to flat routes, +// eg: { fn:1, nestedfn: { test: 2, test2: 3}} -> { fn:1, nestedfn:test: 2, nestedfn:test2: 3} + +export default function flatRouteredFunctions(obj, prefix = "", acc = {}) { + for (const key in obj) { + if (Object.prototype.hasOwnProperty.call(obj, key)) { + const value = obj[key] + // Determine the new key: if there's a prefix, add it with a colon separator. + const newKey = prefix ? `${prefix}:${key}` : key + // If value is a non-null object (and not an array), recursively flatten it. + if ( + value !== null && + typeof value === "object" && + !Array.isArray(value) + ) { + flatRouteredFunctions(value, newKey, acc) + } else { + acc[newKey] = value + } + } + } + return acc +} diff --git a/src/utils/getRouteredFunctions.js b/src/utils/getRouteredFunctions.js new file mode 100644 index 0000000..2d93204 --- /dev/null +++ b/src/utils/getRouteredFunctions.js @@ -0,0 +1,24 @@ +import fs from "node:fs/promises" +import path from "node:path" + +export default async function getRouteredFunctions(dir) { + const files = await fs.readdir(dir) + + const result = {} + + for (const file of files) { + const filePath = path.join(dir, file) + const stat = await fs.stat(filePath) + + const eventName = path.basename(file).split(".")[0] + + if (stat.isFile()) { + const event = await import(filePath) + result[eventName] = event.default + } else if (stat.isDirectory()) { + result[eventName] = await getRouteredFunctions(filePath) + } + } + + return result +}