support new experimental ws server

This commit is contained in:
SrGooglo 2025-03-25 19:04:00 +00:00
parent d1b1fafd42
commit 048040bd7f
7 changed files with 391 additions and 39 deletions

View File

@ -0,0 +1,158 @@
import HyperExpress from "hyper-express"
class RTEngineNG {
constructor(config = {}) {
this.events = new Map()
if (typeof config.events === "object") {
for (const [event, handler] of Object.entries(config.events)) {
this.events.set(event, handler)
}
}
this.onUpgrade = config.onUpgrade || null
this.onConnection = config.onConnection || null
this.onDisconnection = config.onDisconnection || null
}
clients = new Set()
router = new HyperExpress.Router()
senders = {
broadcast: async (event, data) => {
for (const client of this.clients) {
this.sendMessage(client, event, data)
}
},
}
sendMessage = (socket, event, data) => {
const payload = JSON.stringify({ event, data })
socket.send(payload)
}
sendToTopic = (socket, topic, event, data, self = false) => {
const payload = JSON.stringify({
topic,
event,
data,
})
socket.publish(topic, payload)
if (self === true) {
this.sendMessage(socket, event, data)
}
}
sendError = (socket, error) => {
if (error instanceof Error) {
error = error.toString()
}
this.sendMessage(socket, "error", error)
}
handleMessage = async (socket, payload) => {
let message = null
try {
message = JSON.parse(payload)
if (typeof message.event !== "string") {
return this.sendError(socket, "Invalid event type")
}
const handler = this.events.get(message.event)
if (typeof handler === "function") {
const handlerSenders = {
...this.senders,
toTopic: (room, event, data, self) => {
this.sendToTopic(socket, room, event, data, self)
},
send: (event, data) => {
this.sendMessage(socket, event, data)
},
error: (error) => {
this.sendError(socket, error)
},
}
await handler(socket, message.data, handlerSenders)
} else {
console.log(`[ws] 404 /${message.event}`)
this.sendError(socket, "Event handler not found")
}
} catch (error) {
console.log(`[ws] 500 /${message?.event ?? "unknown"} >`, error)
this.sendError(socket, error)
}
}
handleConnection = async (socket) => {
if (this.onConnection) {
await this.onConnection(socket)
}
socket.on("message", (payload) => this.handleMessage(socket, payload))
socket.on("close", () => this.handleDisconnection(socket))
this.clients.add(socket)
}
handleDisconnection = async (socket) => {
if (this.onDisconnection) {
await this.onDisconnection(socket)
}
this.clients.delete(socket)
}
handleUpgrade = async (req, res) => {
try {
const context = {
id: nanoid(),
token: req.query.token,
user: null,
httpHeaders: req.headers,
}
if (typeof this.onUpgrade === "function") {
await this.onUpgrade(context, req.query.token, res)
} else {
res.upgrade(context)
}
} catch (error) {
console.error("Error upgrading connection:", error)
res.status(401).end()
}
}
registerEvent = (event, handler) => {
this.events.set(event, handler)
}
registerEvents = (obj) => {
for (const [event, handler] of Object.entries(obj)) {
this.registerEvent(event, handler)
}
}
attach = async (engine) => {
this.engine = engine
this.router.ws("/", this.handleConnection)
this.router.upgrade("/", this.handleUpgrade)
this.engine.app.use("/", this.router)
}
close = () => {
// nothing to do, yet...
}
}
export default RTEngineNG

View File

@ -0,0 +1,149 @@
import he from "hyper-express"
import rtengineng from "../../classes/rtengineng"
import getRouteredFunctions from "../../utils/getRouteredFunctions"
import flatRouteredFunctions from "../../utils/flatRouteredFunctions"
export default class HyperExpressEngineNG {
constructor(params, ctx) {
this.params = params
this.ctx = ctx
}
app = null
ws = null
router = null
initialize = async (params) => {
console.warn(
`hyper-express-ng is a experimental engine, some features may not be available or work properly!`,
)
const serverParams = {
max_body_length: 50 * 1024 * 1024, //50MB in bytes,
}
if (params.ssl) {
serverParams.key_file_name = params.ssl?.key ?? null
serverParams.cert_file_name = params.ssl?.cert ?? null
}
this.app = new he.Server(serverParams)
this.router = new he.Router()
// create a router map
if (typeof this.router.map !== "object") {
this.router.map = {}
}
await this.router.any("*", (req, res) => {
return res.status(404).json({
code: 404,
message: "Not found",
})
})
await this.app.use(async (req, res, next) => {
if (req.method === "OPTIONS") {
// handle cors
if (params.ignoreCors) {
res.setHeader("Access-Control-Allow-Methods", "*")
res.setHeader("Access-Control-Allow-Origin", "*")
res.setHeader("Access-Control-Allow-Headers", "*")
}
return res.status(204).end()
}
// register body parser
if (req.headers["content-type"]) {
if (
!req.headers["content-type"].startsWith(
"multipart/form-data",
)
) {
req.body = await req.urlencoded()
req.body = await req.json(req.body)
}
}
})
if (params.enableWebsockets) {
this.ws = new rtengineng({
onUpgrade: params.handleWsUpgrade,
onConnection: params.handleWsConnection,
onDisconnect: params.handleWsDisconnect,
})
await this.ws.attach(this)
}
}
listen = async (params) => {
if (process.env.lb_service) {
let pathOverrides = Object.keys(this.router.map).map((key) => {
return key.split("/")[1]
})
// remove duplicates
pathOverrides = [...new Set(pathOverrides)]
// remove "" and _map
pathOverrides = pathOverrides.filter((key) => {
if (key === "" || key === "_map") {
return false
}
return true
})
if (params.enableWebsockets) {
process.send({
type: "router:ws:register",
id: process.env.lb_service.id,
index: process.env.lb_service.index,
data: {
namespace: params.refName,
listen_port: this.params.listen_port,
ws_path: params.wsPath ?? "/",
},
})
}
if (process.send) {
// try to send router map to host
process.send({
type: "router:register",
id: process.env.lb_service.id,
index: process.env.lb_service.index,
data: {
router_map: this.router.map,
path_overrides: pathOverrides,
listen: {
ip: this.params.listen_ip,
port: this.params.listen_port,
},
},
})
}
}
await this.app.listen(this.params.listen_port)
}
// close must be synchronous
close = () => {
if (this.ws && typeof this.ws.close === "function") {
this.ws.close()
}
if (typeof this.app.close === "function") {
this.app.close()
}
if (typeof this.ctx.onClose === "function") {
this.ctx.onClose()
}
}
}

9
src/engines/index.js Normal file
View File

@ -0,0 +1,9 @@
import HyperExpress from "./hyper-express"
import HyperExpressNG from "./hyper-express-ng"
import Worker from "./worker"
export default {
"hyper-express": HyperExpress,
"hyper-express-ng": HyperExpressNG,
worker: Worker,
}

View File

@ -1,36 +1,28 @@
import fs from "node:fs"
import RecursiveRegister from "../../lib/recursiveRegister"
import getRouteredFunctions from "../../utils/getRouteredFunctions"
import flatRouteredFunctions from "../../utils/flatRouteredFunctions"
export default async (startDir, engine) => {
if (!engine.ws) {
if (!engine.ws || !fs.existsSync(startDir)) {
return engine
}
if (!fs.existsSync(startDir)) {
let events = await getRouteredFunctions(startDir)
events = flatRouteredFunctions(events)
if (typeof events !== "object") {
return engine
}
await RecursiveRegister({
start: startDir,
match: async (filePath) => {
return filePath.endsWith(".js") || filePath.endsWith(".ts")
},
onMatch: async ({ absolutePath, relativePath }) => {
let eventName = relativePath.split("/").join(":")
eventName = eventName.replace(".js", "")
eventName = eventName.replace(".ts", "")
let fn = require(absolutePath)
fn = fn.default ?? fn
console.log(`[WEBSOCKET] register event : ${eventName} >`, fn)
engine.ws.events.set(eventName, fn)
if (typeof engine.ws.registerEvents === "function") {
await engine.ws.registerEvents(events)
} else {
for (const eventKey of Object.keys(events)) {
engine.ws.events.set(eventKey, events[eventKey])
}
}
})
return engine
}

View File

@ -13,17 +13,7 @@ import registerBaseEndpoints from "./initializators/registerBaseEndpoints"
import registerWebsocketsEvents from "./initializators/registerWebsocketsEvents"
import registerHttpRoutes from "./initializators/registerHttpRoutes"
async function loadEngine(engine) {
const enginesPath = path.resolve(__dirname, "engines")
const selectedEnginePath = path.resolve(enginesPath, engine)
if (!fs.existsSync(selectedEnginePath)) {
throw new Error(`Engine ${engine} not found!`)
}
return require(selectedEnginePath).default
}
import Engines from "./engines"
class Server {
constructor(params = {}, controllers = {}, middlewares = {}, headers = {}) {
@ -145,6 +135,9 @@ class Server {
const engineParams = {
...this.params,
handleWsUpgrade: this.handleWsUpgrade,
handleWsConnection: this.handleWsConnection,
handleWsDisconnect: this.handleWsDisconnect,
handleWsAuth: this.handleWsAuth,
handleAuth: this.handleHttpAuth,
requireAuth: this.constructor.requireHttpAuth,
@ -153,7 +146,11 @@ class Server {
}
// initialize engine
this.engine = await loadEngine(this.params.useEngine)
this.engine = Engines[this.params.useEngine]
if (!this.engine) {
throw new Error(`Engine ${this.params.useEngine} not found`)
}
this.engine = new this.engine(engineParams, this)

View File

@ -0,0 +1,23 @@
// convert routered functions to flat routes,
// eg: { fn:1, nestedfn: { test: 2, test2: 3}} -> { fn:1, nestedfn:test: 2, nestedfn:test2: 3}
export default function flatRouteredFunctions(obj, prefix = "", acc = {}) {
for (const key in obj) {
if (Object.prototype.hasOwnProperty.call(obj, key)) {
const value = obj[key]
// Determine the new key: if there's a prefix, add it with a colon separator.
const newKey = prefix ? `${prefix}:${key}` : key
// If value is a non-null object (and not an array), recursively flatten it.
if (
value !== null &&
typeof value === "object" &&
!Array.isArray(value)
) {
flatRouteredFunctions(value, newKey, acc)
} else {
acc[newKey] = value
}
}
}
return acc
}

View File

@ -0,0 +1,24 @@
import fs from "node:fs/promises"
import path from "node:path"
export default async function getRouteredFunctions(dir) {
const files = await fs.readdir(dir)
const result = {}
for (const file of files) {
const filePath = path.join(dir, file)
const stat = await fs.stat(filePath)
const eventName = path.basename(file).split(".")[0]
if (stat.isFile()) {
const event = await import(filePath)
result[eventName] = event.default
} else if (stat.isDirectory()) {
result[eventName] = await getRouteredFunctions(filePath)
}
}
return result
}