From f2076b5abe2bdf09ed012936342ce8c9f795ba5a Mon Sep 17 00:00:00 2001 From: SrGooglo Date: Thu, 9 May 2024 21:37:14 +0000 Subject: [PATCH] reimplement initializators --- package.json | 10 +- src/server/classes/rtengine/index.js | 325 ++++++++---------- src/server/engines/hyper-express/index.js | 44 ++- .../registerBaseEndpoints/index.js | 17 + .../registerHttpRoutes/index.js | 79 +++++ .../registerWebsocketsEvents/index.js | 36 ++ src/server/lib/recursiveRegister/index.js | 37 ++ src/server/lib/redis_map/index.js | 237 ++++++------- src/server/server.js | 251 +++----------- 9 files changed, 500 insertions(+), 536 deletions(-) create mode 100644 src/server/initializators/registerBaseEndpoints/index.js create mode 100644 src/server/initializators/registerHttpRoutes/index.js create mode 100644 src/server/initializators/registerWebsocketsEvents/index.js create mode 100644 src/server/lib/recursiveRegister/index.js diff --git a/package.json b/package.json index 0c499c2..44c6db0 100755 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "linebridge", - "version": "0.19.1", + "version": "0.20.0", "description": "API Framework for RageStudio backends", "author": "RageStudio", "main": "./dist/client/index.js", @@ -33,16 +33,18 @@ "axios": "^1.6.7", "axios-retry": "3.4.0", "cors": "2.8.5", + "dotenv": "^16.4.4", "express": "^4.18.3", - "hyper-express": "^6.14.12", + "hyper-express": "^6.16.1", "ioredis": "^5.3.2", "md5": "^2.3.0", "module-alias": "2.2.2", "morgan": "1.10.0", + "signal-exit": "^4.1.0", "socket.io": "^4.7.4", "socket.io-client": "4.5.4", - "uuid": "^9.0.1", - "sucrase": "^3.35.0" + "sucrase": "^3.35.0", + "uuid": "^9.0.1" }, "devDependencies": { "@ragestudio/hermes": "^0.1.0", diff --git a/src/server/classes/rtengine/index.js b/src/server/classes/rtengine/index.js index cc57123..40b3da9 100755 --- a/src/server/classes/rtengine/index.js +++ b/src/server/classes/rtengine/index.js @@ -1,83 +1,147 @@ import cluster from "node:cluster" import redis from "ioredis" - +import SocketIO from "socket.io" import { EventEmitter } from "@foxify/events" -import { createAdapter as createRedisAdapter } from "@socket.io/redis-adapter" -import { createAdapter as createClusterAdapter } from "@socket.io/cluster-adapter" -import { setupWorker } from "@socket.io/sticky" -import { Emitter } from "@socket.io/redis-emitter" - import RedisMap from "../../lib/redis_map" export default class RTEngineServer { constructor(params = {}) { this.params = params - - this.io = this.params.io ?? undefined - this.redis = this.params.redis ?? undefined - this.redisEmitter = null - this.clusterMode = !!cluster.isWorker - this.connections = null - this.users = null + this.redisConnParams = { + host: this.params.redisOptions?.host ?? process.env.REDIS_HOST ?? "localhost", + port: this.params.redisOptions?.port ?? process.env.REDIS_PORT ?? 6379, + username: this.params.redisOptions?.username ?? (process.env.REDIS_AUTH && process.env.REDIS_AUTH.split(":")[0]), + password: this.params.redisOptions?.password ?? (process.env.REDIS_AUTH && process.env.REDIS_AUTH.split(":")[1]), + db: this.params.redisOptions?.db ?? process.env.REDIS_DB ?? 0 + } + + this.redis = params.redis + this.io = params.io + } + + worker_id = nanoid() + + io = null + redis = null + + connections = null + users = null + + events = new Map() + + async initialize() { + console.log("🌐 Initializing RTEngine server...") if (!this.io) { - throw new Error("No io provided") + this.io = new SocketIO.Server({ + path: this.params.root ?? "/", + }) + } + + if (!this.redis) { + this.redis = new redis({ + host: this.redisConnParams.host, + port: this.redisConnParams.port, + username: this.redisConnParams.username, + password: this.redisConnParams.password, + db: this.redisConnParams.db, + }) + } + + // create mappers + this.connections = new RedisMap(this.redis, { + refKey: "connections", + worker_id: this.worker_id, + }) + + this.users = new RedisMap(this.redis, { + refKey: "users", + worker_id: this.worker_id, + }) + + // register middlewares + if (typeof this.middlewares === "object" && Array.isArray(this.middlewares)) { + for (const middleware of this.middlewares) { + this.io.use(middleware) + } + } + + // handle connection + this.io.on("connection", (socket) => { + this.eventHandler(this.onConnect, socket) + }) + + console.log(`[RTEngine] Listening...`) + console.log(`[RTEngine] Universal worker id [${this.worker_id}]`) + + return true + } + + close = () => { + console.log(`Cleaning up RTEngine server...`) + + // WARN: Do not flush connections pls + if (process.env.NODE_ENV !== "production") { + console.log(`Flushing previus connections... (Only for dev mode)`) + this.connections.flush() + } + + if (this.clusterMode) { + this.connections.flush(cluster.worker.id) + } + + if (this.io) { + this.io.close() + } + + if (this.redis) { + this.redis.quit() } } onConnect = async (socket) => { - console.log(`šŸ¤ New client connected on socket id [${socket.id}]`) + console.log(`[RTEngine] new:client | id [${socket.id}]`) - socket.eventEmitter = new EventEmitter() + // create eventBus + socket.eventBus = new EventEmitter() + socket.pendingTimeouts = new Set() + // register events if (typeof this.events === "object") { - for (const event in this.events) { - socket.on(event, (...args) => { - this.eventHandler(this.events[event], socket, ...args) + for (const [key, handler] of this.events.entries()) { + socket.on(key, (...args) => { + this.eventHandler(handler, socket, ...args) }) } } - socket.on("disconnect", (_socket) => { + // handle ping + socket.on("ping", () => { + socket.emit("pong") + }) + + // handle disconnect + socket.on("disconnect", () => { this.eventHandler(this.onDisconnect, socket) }) - const conn_obj = { - id: socket.id, - } - - if (this.clusterMode) { - conn_obj.worker_id = cluster.worker.id - conn_obj._remote = true - - this.redisEmitter.serverSideEmit(`redis:conn:set`, conn_obj) - } - - await this.connections.set(conn_obj.id, conn_obj) - - console.log(`āš™ļø Awaiting authentication for client [${socket.id}]`) + await this.connections.set(socket.id, socket) if (this.params.requireAuth) { - await this.authenticateClient(socket, null, (this.params.handleAuth ?? this.handleAuth)) + await this.onAuth(socket, null, (this.params.handleAuth ?? this.handleAuth)) } else if (socket.handshake.auth.token ?? socket.handshake.query.auth) { - await this.authenticateClient(socket, (socket.handshake.auth.token ?? socket.handshake.query.auth), (this.params.handleAuth ?? this.handleAuth)) - } - - if (process.env.NODE_ENV === "development") { - const connected_size = await this.connections.size() - - console.log(`Total connected clients: ${connected_size}`) + await this.onAuth(socket, (socket.handshake.auth.token ?? socket.handshake.query.auth), (this.params.handleAuth ?? this.handleAuth)) } } onDisconnect = async (socket,) => { - console.log(`šŸ‘‹ Client disconnected on socket id [${socket.id}]`) + console.log(`[RTEngine] disconnect:client | id [${socket.id}]`) - if (socket.eventEmitter.emit) { - socket.eventEmitter.emit("disconnect") + if (socket.eventBus.emit) { + socket.eventBus.emit("disconnect") } else { console.warn(`[${socket.id}][@${socket.userData.username}] Cannot emit disconnect event`) } @@ -91,15 +155,11 @@ export default class RTEngineServer { } await this.connections.del(socket.id) - - const connected_size = await this.connections.size() - - console.log(`Total connected clients: ${connected_size}`) } - authenticateClient = async (socket, token, handleAuth) => { + onAuth = async (socket, token, handleAuth) => { if (typeof handleAuth !== "function") { - console.warn(`Skipping authentication for client [${socket.id}] due no auth handler provided`) + console.log(`[RTEngine] [${socket.id}] No auth handler provided`) return false } @@ -113,7 +173,7 @@ export default class RTEngineServer { } function err(code, message) { - console.error(`šŸ›‘ Disconecting client [${socket.id}] cause an auth error >`, code, message) + console.log(`[RTEngine] [${socket.id}] Auth error: ${code} >`, message) socket.emit("response:error", { code, @@ -132,7 +192,7 @@ export default class RTEngineServer { const authResult = await handleAuth(socket, token, err) if (authResult) { - const conn = await this.connections.update(socket.id, authResult) + const conn = await this.connections.has(socket.id) // check if connection update is valid to avoid race condition(When user disconnect before auth verification is completed) if (!conn) { @@ -140,14 +200,29 @@ export default class RTEngineServer { return false } - this.users.set(authResult.user_id, { + this.users.set(authResult.user_id.toString(), { socket_id: socket.id, ...authResult, }) socket.emit("response:auth:ok") - console.log(`āœ… Authenticated client [${socket.id}] as [@${authResult.username}]`) + console.log(`[RTEngine] client:authenticated | socket_id [${socket.id}] | user_id [${authResult.user_id}] | username [@${authResult.username}]`) + } + } + + eventHandler = async (fn, socket, payload) => { + try { + await fn(socket, payload, this) + } catch (error) { + console.error(error) + + if (typeof socket.emit === "function") { + socket.emit("response:error", { + code: 500, + message: error.message, + }) + } } } @@ -167,144 +242,18 @@ export default class RTEngineServer { userById: async (user_id) => { const user = await this.users.get(user_id) - console.log(user) - return user - } - } + }, + socketByUserId: async (user_id) => { + const user = await this.users.get(user_id) - eventHandler = async (fn, socket, ...args) => { - try { - await fn(socket, ...args) - } catch (error) { - console.error(error) - - if (socket.emit) { - socket.emit("response:error", { - code: 500, - message: error.message, - }) + if (!user) { + return null } - } - } - registerBaseEndpoints = (socket) => { - if (!socket) { + const socket = await this.connections.get(user.socket_id) + return socket } - - socket.on("ping", () => { - socket.emit("pong") - }) - - return socket - } - - async initialize({ host, port, username, password, db } = {}) { - console.log("🌐 Initializing RTEngine server...") - - process.on("exit", this.cleanUp) - process.on("SIGINT", this.cleanUp) - process.on("SIGTERM", this.cleanUp) - process.on("SIGBREAK", this.cleanUp) - process.on("SIGHUP", this.cleanUp) - - // fullfill args - if (typeof host === "undefined") { - host = this.params.redis?.host ?? process.env.REDIS_HOST ?? "localhost" - } - - if (typeof port === "undefined") { - port = this.params.redis?.port ?? process.env.REDIS_PORT ?? 6379 - } - - if (typeof username === "undefined") { - username = this.params.redis?.username ?? process.env.REDIS_USERNAME ?? (process.env.REDIS_AUTH && process.env.REDIS_AUTH.split(":")[0]) - } - - if (typeof password === "undefined") { - password = this.params.redis?.password ?? process.env.REDIS_PASSWORD ?? (process.env.REDIS_AUTH && process.env.REDIS_AUTH.split(":")[1]) - } - - if (typeof db === "undefined") { - db = this.params.redis?.db ?? process.env.REDIS_DB ?? 0 - } - - // create default servers - if (typeof this.redis === "undefined") { - this.redis = new redis({ - host, - port, - username: username, - password: password, - db: db, - }) - } - - // create mappers - this.connections = new RedisMap(this.redis, { - refKey: "connections", - }) - - this.users = new RedisMap(this.redis, { - refKey: "users", - }) - - // setup clustered mode - if (this.clusterMode) { - console.log(`Connecting to redis as cluster worker id [${cluster.worker.id}]`) - - this.io.adapter(createClusterAdapter()) - - const subClient = this.redis.duplicate() - - this.io.adapter(createRedisAdapter(this.redis, subClient)) - - setupWorker(this.io) - - this.redisEmitter = new Emitter(this.redis) - } - - // WARN: Do not flush connections pls - if (process.env.NODE_ENV !== "production") { - console.log(`Flushing previus connections... (Only for dev mode)`) - await this.connections.flush() - } - - // register middlewares - if (typeof this.middlewares === "object" && Array.isArray(this.middlewares)) { - for (const middleware of this.middlewares) { - this.io.use(middleware) - } - } - - for (const event in this._redisEvents) { - this.io.on(event, this._redisEvents[event]) - } - - this.io.on("connection", (socket) => { - this.registerBaseEndpoints(socket) - this.eventHandler(this.onConnect, socket) - }) - - if (typeof this.onInit === "function") { - await this.onInit() - } - - console.log(`āœ… RTEngine server is running on port [${this.params.listen_port}] ${this.clusterMode ? `on clustered mode [${cluster.worker.id}]` : ""}`) - - return true - } - - cleanUp = async () => { - console.log(`Cleaning up RTEngine server...`) - - if (this.clusterMode) { - this.connections.flush(cluster.worker.id) - } - - if (this.io) { - this.io.close() - } } } \ No newline at end of file diff --git a/src/server/engines/hyper-express/index.js b/src/server/engines/hyper-express/index.js index b08b0af..0af9ea7 100644 --- a/src/server/engines/hyper-express/index.js +++ b/src/server/engines/hyper-express/index.js @@ -1,6 +1,5 @@ import he from "hyper-express" import rtengine from "../../classes/rtengine" -import SocketIO from "socket.io" export default class Engine { constructor(params) { @@ -13,11 +12,14 @@ export default class Engine { router = new he.Router() - io = null - ws = null - init = async (params) => { + initialize = async (params) => { + // create a router map + if (typeof this.router.map !== "object") { + this.router.map = {} + } + // register 404 await this.router.any("*", (req, res) => { return res.status(404).json({ @@ -26,6 +28,14 @@ export default class Engine { }) }) + this.app.use((req, res, next) => { + if (req.method === "OPTIONS") { + return res.status(204).end() + } + + next() + }) + // register body parser await this.app.use(async (req, res, next) => { if (req.headers["content-type"]) { @@ -37,17 +47,15 @@ export default class Engine { }) if (!params.disableWebSockets) { - this.io = new SocketIO.Server({ - path: `/${params.refName}`, - }) - - this.io.attachApp(this.app.uws_instance) - - this.ws = global.rtengine = new rtengine({ + this.ws = global.websocket = new rtengine({ ...params, handleAuth: params.handleWsAuth, - io: this.io, + root: `/${params.refName}` }) + + this.ws.initialize() + + await this.ws.io.attachApp(this.app.uws_instance) } } @@ -106,10 +114,16 @@ export default class Engine { } close = async () => { - if (this.io) { - this.io.close() + if (this.ws.events) { + this.ws.clear() } - await this.app.close() + if (typeof this.ws?.close === "function") { + await this.ws.close() + } + + if (typeof this.app?.close === "function") { + await this.app.close() + } } } \ No newline at end of file diff --git a/src/server/initializators/registerBaseEndpoints/index.js b/src/server/initializators/registerBaseEndpoints/index.js new file mode 100644 index 0000000..10ec3ca --- /dev/null +++ b/src/server/initializators/registerBaseEndpoints/index.js @@ -0,0 +1,17 @@ +import fs from "node:fs" +import path from "node:path" + +export default async (ctx) => { + const scanPath = path.join(__dirname, "../../", "baseEndpoints") + const files = fs.readdirSync(scanPath) + + for await (const file of files) { + if (file === "index.js") { + continue + } + + let endpoint = require(path.join(scanPath, file)).default + + new endpoint(ctx) + } +} \ No newline at end of file diff --git a/src/server/initializators/registerHttpRoutes/index.js b/src/server/initializators/registerHttpRoutes/index.js new file mode 100644 index 0000000..5069332 --- /dev/null +++ b/src/server/initializators/registerHttpRoutes/index.js @@ -0,0 +1,79 @@ +import fs from "node:fs" + +import Endpoint from "../../classes/endpoint" +import RecursiveRegister from "../../lib/recursiveRegister" + +const parametersRegex = /\[([a-zA-Z0-9_]+)\]/g + +export default async (startDir, engine, ctx) => { + if (!fs.existsSync(startDir)) { + return engine + } + + await RecursiveRegister({ + start: startDir, + match: async (filePath) => { + return filePath.endsWith(".js") || filePath.endsWith(".ts") + }, + onMatch: async ({ absolutePath, relativePath }) => { + const paths = relativePath.split("/") + + let method = paths[paths.length - 1].split(".")[0].toLocaleLowerCase() + let route = paths.slice(0, paths.length - 1).join("/") + + // parse parametrized routes + route = route.replace(parametersRegex, ":$1") + route = route.replace("[$]", "*") + + // clean up + route = route.replace(".js", "") + route = route.replace(".ts", "") + + // check if route ends with index + if (route.endsWith("/index")) { + route = route.replace("/index", "") + } + + // add leading slash + route = `/${route}` + + // import route + let fn = require(absolutePath) + + fn = fn.default ?? fn + + if (typeof fn !== "function") { + if (!fn.fn) { + console.warn(`Missing fn handler in [${method}][${route}]`) + return false + } + + if (Array.isArray(fn.useContext)) { + let contexts = {} + + for (const context of fn.useContext) { + contexts[context] = ctx.contexts[context] + } + + fn.contexts = contexts + + fn.fn.bind({ contexts }) + } + } + + new Endpoint( + ctx, + { + route: route, + enabled: true, + middlewares: fn.middlewares, + handlers: { + [method]: fn.fn ?? fn, + } + } + ) + } + }) + + return engine +} \ No newline at end of file diff --git a/src/server/initializators/registerWebsocketsEvents/index.js b/src/server/initializators/registerWebsocketsEvents/index.js new file mode 100644 index 0000000..963d78c --- /dev/null +++ b/src/server/initializators/registerWebsocketsEvents/index.js @@ -0,0 +1,36 @@ +import fs from "node:fs" + +import RecursiveRegister from "../../lib/recursiveRegister" + +export default async (startDir, engine) => { + if (!engine.ws) { + return engine + } + + if (!fs.existsSync(startDir)) { + return engine + } + + await RecursiveRegister({ + start: startDir, + match: async (filePath) => { + return filePath.endsWith(".js") || filePath.endsWith(".ts") + }, + onMatch: async ({ absolutePath, relativePath }) => { + let eventName = relativePath.split("/").join(":") + + eventName = eventName.replace(".js", "") + eventName = eventName.replace(".ts", "") + + let fn = require(absolutePath) + + fn = fn.default ?? fn + + console.log(`[WEBSOCKET] register event : ${eventName} >`, fn) + + engine.ws.events.set(eventName, fn) + } + }) + + return engine +} \ No newline at end of file diff --git a/src/server/lib/recursiveRegister/index.js b/src/server/lib/recursiveRegister/index.js new file mode 100644 index 0000000..e06969a --- /dev/null +++ b/src/server/lib/recursiveRegister/index.js @@ -0,0 +1,37 @@ +import path from "node:path" +import fs from "node:fs" + +export default async ({ + start, + match, + onMatch, +}) => { + const filterFrom = start.split("/").pop() + + async function registerPath(_path) { + const files = await fs.promises.readdir(_path) + + for await (const file of files) { + const filePath = path.join(_path, file) + + const stat = await fs.promises.stat(filePath) + + if (stat.isDirectory()) { + await registerPath(filePath) + + continue + } else { + const isMatch = await match(filePath) + + if (isMatch) { + await onMatch({ + absolutePath: filePath, + relativePath: filePath.split("/").slice(filePath.split("/").indexOf(filterFrom) + 1).join("/"), + }) + } + } + } + } + + await registerPath(start) +} \ No newline at end of file diff --git a/src/server/lib/redis_map/index.js b/src/server/lib/redis_map/index.js index 56f4e89..254d873 100755 --- a/src/server/lib/redis_map/index.js +++ b/src/server/lib/redis_map/index.js @@ -4,39 +4,50 @@ export default class RedisMap { throw new Error("redis client is required") } + if (!params.refKey) { + throw new Error("refKey is required") + } + + if (!params.worker_id) { + throw new Error("worker_id is required") + } + this.redis = redis this.params = params this.refKey = this.params.refKey - - if (!this.refKey) { - throw new Error("refKey is required") - } + this.worker_id = this.params.worker_id } + localMap = new Map() + set = async (key, value) => { if (!key) { - console.warn(`[redis:${this.refKey}] Failed to set entry with no key`) + console.warn(`[redismap] (${this.refKey}) Failed to set entry with no key`) return } if (!value) { - console.warn(`[redis:${this.refKey}] Failed to set entry [${key}] with no value`) + console.warn(`[redismap] (${this.refKey}) Failed to set entry [${key}] with no value`) return } const redisKey = `${this.refKey}:${key}` - //console.log(`[redis:${this.refKey}] Setting entry [${key}]`,) + this.localMap.set(key, value) - await this.redis.hset(redisKey, value) + // console.log(`[redismap] (${this.refKey}) Set entry [${key}] to [${value}]`) + + await this.redis.hset(redisKey, { + worker_id: this.worker_id, + }) return value } get = async (key, value) => { if (!key) { - console.warn(`[redis:${this.refKey}] Failed to get entry with no key`) + console.warn(`[redismap] (${this.refKey}) Failed to get entry with no key`) return } @@ -44,58 +55,24 @@ export default class RedisMap { let result = null - if (value) { - result = await this.redis.hget(redisKey, value) + if (this.localMap.has(key)) { + result = this.localMap.get(key) } else { - result = await this.redis.hgetall(redisKey) - } + const remoteWorkerID = await this.redis.hget(redisKey, value) - if (Object.keys(result).length === 0) { - result = null + if (!remoteWorkerID) { + return null + } + + throw new Error("Redis stream data, not implemented...") } return result } - getMany = async (keys) => { - if (!keys) { - console.warn(`[redis:${this.refKey}] Failed to get entry with no key`) - return - } - - const redisKeys = keys.map((key) => `${this.refKey}:${key}`) - - const pipeline = this.redis.pipeline() - - for (const redisKey of redisKeys) { - pipeline.hgetall(redisKey) - } - - let results = await pipeline.exec() - - results = results.map((result) => { - return result[1] - }) - - // delete null or empty objects - results = results.filter((result) => { - if (result === null) { - return false - } - - if (Object.keys(result).length === 0) { - return false - } - - return true - }) - - return results - } - del = async (key) => { if (!key) { - console.warn(`[redis:${this.refKey}] Failed to delete entry with no key`) + console.warn(`[redismap] (${this.refKey}) Failed to delete entry with no key`) return false } @@ -107,37 +84,18 @@ export default class RedisMap { return false } - await this.redis.hdel(redisKey, Object.keys(data)) + if (this.localMap.has(key)) { + this.localMap.delete(key) + } + + await this.redis.hdel(redisKey, ["worker_id"]) return true } - getAll = async () => { - let map = [] - - let nextIndex = 0 - - do { - const [nextIndexAsStr, results] = await this.redis.scan( - nextIndex, - "MATCH", - `${this.refKey}:*`, - "COUNT", - 100 - ) - - nextIndex = parseInt(nextIndexAsStr, 10) - - map = map.concat(results) - - } while (nextIndex !== 0) - - return map - } - update = async (key, data) => { if (!key) { - console.warn(`[redis:${this.refKey}] Failed to update entry with no key`) + console.warn(`[redismap] (${this.refKey}) Failed to update entry with no key`) return } @@ -146,7 +104,7 @@ export default class RedisMap { let new_data = await this.get(key) if (!new_data) { - console.warn(`[redis:${this.refKey}] Object [${key}] not exist, nothing to update`) + console.warn(`[redismap] (${this.refKey}) Object [${key}] not exist, nothing to update`) return false } @@ -156,68 +114,93 @@ export default class RedisMap { ...data, } - await this.redis.hset(redisKey, new_data) + //console.log(`[redismap] (${this.refKey}) Object [${key}] updated`) + + this.localMap.set(key, new_data) + + await this.redis.hset(redisKey, { + worker_id: this.worker_id, + }) return new_data } - flush = async (worker_id) => { - let nextIndex = 0 + has = async (key) => { + if (!key) { + console.warn(`[redismap] (${this.refKey}) Failed to check entry with no key`) + return false + } - do { - const [nextIndexAsStr, results] = await this.redis.scan( - nextIndex, - "MATCH", - `${this.refKey}:*`, - "COUNT", - 100 - ) + const redisKey = `${this.refKey}:${key}` - nextIndex = parseInt(nextIndexAsStr, 10) + if (this.localMap.has(key)) { + return true + } - const pipeline = this.redis.pipeline() + if (await this.redis.hget(redisKey, "worker_id")) { + return true + } - for await (const key of results) { - const key_id = key.split(this.refKey + ":")[1] - - const data = await this.get(key_id) - - if (!data) { - continue - } - - if (worker_id) { - if (data.worker_id !== worker_id) { - continue - } - } - - pipeline.hdel(key, Object.keys(data)) - } - - await pipeline.exec() - } while (nextIndex !== 0) + return false } - size = async () => { - let count = 0 + // flush = async (worker_id) => { + // let nextIndex = 0 - let nextIndex = 0 + // do { + // const [nextIndexAsStr, results] = await this.redis.scan( + // nextIndex, + // "MATCH", + // `${this.refKey}:*`, + // "COUNT", + // 100 + // ) - do { - const [nextIndexAsStr, results] = await this.redis.scan( - nextIndex, - "MATCH", - `${this.refKey}:*`, - "COUNT", - 100 - ) + // nextIndex = parseInt(nextIndexAsStr, 10) - nextIndex = parseInt(nextIndexAsStr, 10) + // const pipeline = this.redis.pipeline() - count = count + results.length - } while (nextIndex !== 0) + // for await (const key of results) { + // const key_id = key.split(this.refKey + ":")[1] - return count - } + // const data = await this.get(key_id) + + // if (!data) { + // continue + // } + + // if (worker_id) { + // if (data.worker_id !== worker_id) { + // continue + // } + // } + + // pipeline.hdel(key, Object.keys(data)) + // } + + // await pipeline.exec() + // } while (nextIndex !== 0) + // } + + // size = async () => { + // let count = 0 + + // let nextIndex = 0 + + // do { + // const [nextIndexAsStr, results] = await this.redis.scan( + // nextIndex, + // "MATCH", + // `${this.refKey}:*`, + // "COUNT", + // 100 + // ) + + // nextIndex = parseInt(nextIndexAsStr, 10) + + // count = count + results.length + // } while (nextIndex !== 0) + + // return count + // } } \ No newline at end of file diff --git a/src/server/server.js b/src/server/server.js index 5821434..ec3b051 100755 --- a/src/server/server.js +++ b/src/server/server.js @@ -3,13 +3,16 @@ import("./patches") import fs from "node:fs" import path from "node:path" import { EventEmitter } from "@foxify/events" - -import Endpoint from "./classes/endpoint" +import { onExit } from "signal-exit" import defaults from "./defaults" import IPCClient from "./classes/IPCClient" +import registerBaseEndpoints from "./initializators/registerBaseEndpoints" +import registerWebsocketsEvents from "./initializators/registerWebsocketsEvents" +import registerHttpRoutes from "./initializators/registerHttpRoutes" + async function loadEngine(engine) { const enginesPath = path.resolve(__dirname, "engines") @@ -27,7 +30,8 @@ class Server { this.isExperimental = defaults.isExperimental ?? false if (this.isExperimental) { - console.warn("🚧 This version of Linebridge is experimental! 🚧") + console.warn("\n🚧 This version of Linebridge is experimental! 🚧") + console.warn(`Version: ${defaults.version}\n`) } this.params = { @@ -53,15 +57,15 @@ class Server { // fix and fulfill params this.params.useMiddlewares = this.params.useMiddlewares ?? [] this.params.name = this.constructor.refName ?? this.params.refName - this.params.useEngine = this.constructor.useEngine ?? this.params.useEngine ?? "express" + this.params.useEngine = this.constructor.useEngine ?? this.params.useEngine ?? "hyper-express" this.params.listen_ip = this.constructor.listenIp ?? this.constructor.listen_ip ?? this.params.listen_ip ?? "0.0.0.0" this.params.listen_port = this.constructor.listenPort ?? this.constructor.listen_port ?? this.params.listen_port ?? 3000 this.params.http_protocol = this.params.http_protocol ?? "http" this.params.http_address = `${this.params.http_protocol}://${defaults.localhost_address}:${this.params.listen_port}` this.params.disableWebSockets = this.constructor.disableWebSockets ?? this.params.disableWebSockets ?? false - this.params.routesPath = this.constructor.routesPath ?? this.params.routesPath - this.params.wsRoutesPath = this.constructor.wsRoutesPath ?? this.params.wsRoutesPath + this.params.routesPath = this.constructor.routesPath ?? this.params.routesPath ?? path.resolve(process.cwd(), "routes") + this.params.wsRoutesPath = this.constructor.wsRoutesPath ?? this.params.wsRoutesPath ?? path.resolve(process.cwd(), "routes_ws") return this } @@ -77,6 +81,11 @@ class Server { eventBus = new EventEmitter() initialize = async () => { + onExit((code, signal) => { + this.engine.close() + process.exit(code) + }) + const startHrTime = process.hrtime() // register events @@ -103,47 +112,48 @@ class Server { this.engine = new this.engine(engineParams) - if (typeof this.engine.init === "function") { - await this.engine.init(engineParams) + if (typeof this.engine.initialize === "function") { + await this.engine.initialize(engineParams) } - // create a router map - if (typeof this.engine.router.map !== "object") { - this.engine.router.map = {} + // check if ws events are defined + if (typeof this.wsEvents !== "undefined") { + if (!this.engine.ws) { + console.warn("`wsEvents` detected, but Websockets are not enabled! Ignoring...") + } else { + for (const [eventName, eventHandler] of Object.entries(this.wsEvents)) { + this.engine.ws.events.set(eventName, eventHandler) + } + } } // try to execute onInitialize hook if (typeof this.onInitialize === "function") { - await this.onInitialize() + try { + await this.onInitialize() + } + catch (err) { + console.error(err) + process.exit(1) + } } - // set server defined headers + // set defaults this.useDefaultHeaders() - - // set server defined middlewares this.useDefaultMiddlewares() - // register controllers - await this.initializeControllers() + // register http & ws routes + this.engine = await registerHttpRoutes(this.params.routesPath, this.engine, this) + this.engine = await registerWebsocketsEvents(this.params.wsRoutesPath, this.engine) - // register routes - await this.initializeRoutes() - - // register main index endpoint `/` - await this.registerBaseEndpoints() + // register base endpoints if enabled + if (!this.params.disableBaseEndpoint) { + await registerBaseEndpoints(this) + } // use main router await this.engine.app.use(this.engine.router) - // initialize websocket init hook if needed - if (this.engine.ws) { - if (typeof this.engine.ws?.initialize == "function") { - await this.engine.ws.initialize({ - redisInstance: this.redis - }) - } - } - // if is a linebridge service then initialize IPC Channels if (process.env.lb_service) { await this.initializeIpc() @@ -183,6 +193,7 @@ class Server { useDefaultMiddlewares = async () => { const middlewares = await this.resolveMiddlewares([ ...this.params.useMiddlewares, + ...this.useMiddlewares ?? [], ...defaults.useMiddlewares, ]) @@ -191,133 +202,6 @@ class Server { }) } - initializeControllers = async () => { - const controllers = Object.entries(this.controllers) - - for await (let [key, controller] of controllers) { - if (typeof controller !== "function") { - throw new Error(`Controller must use the controller class!`) - } - - if (controller.disabled) { - console.warn(`ā© Controller [${controller.name}] is disabled! Initialization skipped...`) - continue - } - - try { - const ControllerInstance = new controller() - - // get endpoints from controller (ComplexController) - const HTTPEndpoints = ControllerInstance.__get_http_endpoints() - const WSEndpoints = ControllerInstance.__get_ws_endpoints() - - HTTPEndpoints.forEach((endpoint) => { - this.register.http(endpoint, ...this.resolveMiddlewares(controller.useMiddlewares)) - }) - - // WSEndpoints.forEach((endpoint) => { - // this.registerWSEndpoint(endpoint) - // }) - } catch (error) { - console.error(`\n\x1b[41m\x1b[37mšŸ†˜ [${controller.refName ?? controller.name}] Controller initialization failed:\x1b[0m ${error.stack} \n`) - } - } - } - - initializeRoutes = async (filePath) => { - if (!this.params.routesPath) { - return false - } - - const scanPath = filePath ?? this.params.routesPath - - const files = fs.readdirSync(scanPath) - - for await (const file of files) { - const filePath = `${scanPath}/${file}` - - const stat = fs.statSync(filePath) - - if (stat.isDirectory()) { - await this.initializeRoutes(filePath) - - continue - } else if (file.endsWith(".js") || file.endsWith(".jsx") || file.endsWith(".ts") || file.endsWith(".tsx")) { - let splitedFilePath = filePath.split("/") - - splitedFilePath = splitedFilePath.slice(splitedFilePath.indexOf("routes") + 1) - - const method = splitedFilePath[splitedFilePath.length - 1].split(".")[0].toLocaleLowerCase() - - splitedFilePath = splitedFilePath.slice(0, splitedFilePath.length - 1) - - // parse parametrized routes - const parametersRegex = /\[([a-zA-Z0-9_]+)\]/g - - splitedFilePath = splitedFilePath.map((route) => { - if (route.match(parametersRegex)) { - route = route.replace(parametersRegex, ":$1") - } - - route = route.replace("[$]", "*") - - return route - }) - - let route = splitedFilePath.join("/") - - route = route.replace(".jsx", "") - route = route.replace(".js", "") - route = route.replace(".ts", "") - route = route.replace(".tsx", "") - - if (route.endsWith("/index")) { - route = route.replace("/index", "") - } - - route = `/${route}` - - // import route - let routeFile = require(filePath) - - routeFile = routeFile.default ?? routeFile - - if (typeof routeFile !== "function") { - if (!routeFile.fn) { - console.warn(`Missing fn handler in [${method}][${route}]`) - continue - } - - if (Array.isArray(routeFile.useContext)) { - let contexts = {} - - for (const context of routeFile.useContext) { - contexts[context] = this.contexts[context] - } - - routeFile.contexts = contexts - - routeFile.fn.bind({ contexts }) - } - } - - new Endpoint( - this, - { - route: route, - enabled: true, - middlewares: routeFile.middlewares, - handlers: { - [method]: routeFile.fn ?? routeFile, - } - } - ) - - continue - } - } - } - register = { http: (endpoint, ..._middlewares) => { // check and fix method @@ -336,6 +220,10 @@ class Server { let middlewares = [..._middlewares] if (endpoint.middlewares) { + if (!Array.isArray(endpoint.middlewares)) { + endpoint.middlewares = [endpoint.middlewares] + } + middlewares = [...middlewares, ...this.resolveMiddlewares(endpoint.middlewares)] } @@ -347,36 +235,6 @@ class Server { // register endpoint to http interface router this.engine.router[endpoint.method](endpoint.route, ...middlewares, endpoint.fn) }, - ws: (endpoint, ...execs) => { - endpoint.nsp = endpoint.nsp ?? "/main" - - this.websocket_instance.eventsChannels.push([endpoint.nsp, endpoint.on, endpoint.dispatch]) - - this.websocket_instance.map[endpoint.on] = { - nsp: endpoint.nsp, - channel: endpoint.on, - } - }, - } - - async registerBaseEndpoints() { - if (this.params.disableBaseEndpoint) { - console.warn("ā€¼ļø [disableBaseEndpoint] Base endpoint is disabled! Endpoints mapping will not be available, so linebridge client bridges will not work! ā€¼ļø") - return false - } - - const scanPath = path.join(__dirname, "baseEndpoints") - const files = fs.readdirSync(scanPath) - - for await (const file of files) { - if (file === "index.js") { - continue - } - - let endpoint = require(path.join(scanPath, file)).default - - new endpoint(this) - } } resolveMiddlewares = (requestedMiddlewares) => { @@ -385,7 +243,9 @@ class Server { ...defaults.middlewares, } - requestedMiddlewares = Array.isArray(requestedMiddlewares) ? requestedMiddlewares : [requestedMiddlewares] + if (typeof requestedMiddlewares === "string") { + requestedMiddlewares = [requestedMiddlewares] + } const execs = [] @@ -405,19 +265,6 @@ class Server { return execs } - - // Utilities - toogleEndpointReachability = (method, route, enabled) => { - if (typeof this.endpoints_map[method] !== "object") { - throw new Error(`Cannot toogle endpoint, method [${method}] not set!`) - } - - if (typeof this.endpoints_map[method][route] !== "object") { - throw new Error(`Cannot toogle endpoint [${route}], is not registered!`) - } - - this.endpoints_map[method][route].enabled = enabled ?? !this.endpoints_map[method][route].enabled - } } module.exports = Server \ No newline at end of file