support on exit

This commit is contained in:
SrGooglo 2025-02-26 20:24:50 +00:00
parent d7fa42e53d
commit 4fe16f1a81
3 changed files with 620 additions and 525 deletions

View File

@ -6,254 +6,287 @@ import { EventEmitter } from "@foxify/events"
import RedisMap from "../../lib/redis_map" import RedisMap from "../../lib/redis_map"
export default class RTEngineServer { export default class RTEngineServer {
constructor(params = {}) { constructor(params = {}) {
this.params = params this.params = params
this.clusterMode = !!cluster.isWorker this.clusterMode = !!cluster.isWorker
this.redisConnParams = { this.redisConnParams = {
host: this.params.redisOptions?.host ?? process.env.REDIS_HOST ?? "localhost", host:
port: this.params.redisOptions?.port ?? process.env.REDIS_PORT ?? 6379, this.params.redisOptions?.host ??
username: this.params.redisOptions?.username ?? (process.env.REDIS_AUTH && process.env.REDIS_AUTH.split(":")[0]), process.env.REDIS_HOST ??
password: this.params.redisOptions?.password ?? (process.env.REDIS_AUTH && process.env.REDIS_AUTH.split(":")[1]), "localhost",
db: this.params.redisOptions?.db ?? process.env.REDIS_DB ?? 0 port:
} this.params.redisOptions?.port ??
process.env.REDIS_PORT ??
6379,
username:
this.params.redisOptions?.username ??
(process.env.REDIS_AUTH &&
process.env.REDIS_AUTH.split(":")[0]),
password:
this.params.redisOptions?.password ??
(process.env.REDIS_AUTH &&
process.env.REDIS_AUTH.split(":")[1]),
db: this.params.redisOptions?.db ?? process.env.REDIS_DB ?? 0,
}
this.redis = params.redis this.redis = params.redis
this.io = params.io this.io = params.io
} }
worker_id = nanoid() worker_id = nanoid()
io = null io = null
redis = null redis = null
connections = null connections = null
users = null users = null
events = new Map() events = new Map()
async initialize() { async initialize() {
console.log("🌐 Initializing RTEngine server...") console.log("🌐 Initializing RTEngine server...")
if (!this.io) { if (!this.io) {
this.io = new SocketIO.Server({ this.io = new SocketIO.Server({
path: this.params.root ?? "/", path: this.params.root ?? "/",
}) })
} }
if (!this.redis) { if (!this.redis) {
this.redis = new redis({ this.redis = new redis({
host: this.redisConnParams.host, lazyConnect: true,
port: this.redisConnParams.port, host: this.redisConnParams.host,
username: this.redisConnParams.username, port: this.redisConnParams.port,
password: this.redisConnParams.password, username: this.redisConnParams.username,
db: this.redisConnParams.db, password: this.redisConnParams.password,
}) db: this.redisConnParams.db,
} maxRetriesPerRequest: null,
})
}
// create mappers await this.redis.connect()
this.connections = new RedisMap(this.redis, {
refKey: "connections",
worker_id: this.worker_id,
})
this.users = new RedisMap(this.redis, { // create mappers
refKey: "users", this.connections = new RedisMap(this.redis, {
worker_id: this.worker_id, refKey: "connections",
}) worker_id: this.worker_id,
})
// register middlewares this.users = new RedisMap(this.redis, {
if (typeof this.middlewares === "object" && Array.isArray(this.middlewares)) { refKey: "users",
for (const middleware of this.middlewares) { worker_id: this.worker_id,
this.io.use(middleware) })
}
}
// handle connection // register middlewares
this.io.on("connection", (socket) => { if (
this.eventHandler(this.onConnect, socket) typeof this.middlewares === "object" &&
}) Array.isArray(this.middlewares)
) {
for (const middleware of this.middlewares) {
this.io.use(middleware)
}
}
console.log(`[RTEngine] Listening...`) // handle connection
console.log(`[RTEngine] Universal worker id [${this.worker_id}]`) this.io.on("connection", (socket) => {
this.eventHandler(this.onConnect, socket)
})
return true console.log(`[RTEngine] Listening...`)
} console.log(`[RTEngine] Universal worker id [${this.worker_id}]`)
close = () => { return true
console.log(`Cleaning up RTEngine server...`) }
// WARN: Do not flush connections pls close = () => {
if (process.env.NODE_ENV !== "production") { console.log(`Cleaning up RTEngine server...`)
console.log(`Flushing previus connections... (Only for dev mode)`)
this.connections.flush()
}
if (this.clusterMode) { // WARN: Do not flush connections pls
this.connections.flush(cluster.worker.id) if (process.env.NODE_ENV !== "production") {
} console.log(`Flushing previus connections... (Only for dev mode)`)
this.connections.flush()
}
if (this.io) { if (this.clusterMode) {
this.io.close() this.connections.flush(cluster.worker.id)
} }
if (this.redis) { if (this.io) {
this.redis.quit() this.io.close()
} }
}
onConnect = async (socket) => { if (this.redis) {
console.log(`[RTEngine] new:client | id [${socket.id}]`) this.redis.quit()
}
}
// create eventBus onConnect = async (socket) => {
socket.eventBus = new EventEmitter() console.log(`[RTEngine] new:client | id [${socket.id}]`)
socket.pendingTimeouts = new Set()
// register events // create eventBus
if (typeof this.events === "object") { socket.eventBus = new EventEmitter()
for (const [key, handler] of this.events.entries()) { socket.pendingTimeouts = new Set()
socket.on(key, (...args) => {
this.eventHandler(handler, socket, ...args)
})
}
}
// handle ping // register events
socket.on("ping", () => { if (typeof this.events === "object") {
socket.emit("pong") console.log("registering events", this.events)
}) for (const [key, handler] of this.events.entries()) {
socket.on(key, (...args) => {
this.eventHandler(handler, socket, ...args)
})
}
}
// handle disconnect // handle ping
socket.on("disconnect", () => { socket.on("ping", () => {
this.eventHandler(this.onDisconnect, socket) socket.emit("pong")
}) })
await this.connections.set(socket.id, socket) // handle disconnect
socket.on("disconnect", () => {
this.eventHandler(this.onDisconnect, socket)
})
if (this.params.requireAuth) { await this.connections.set(socket.id, socket)
await this.onAuth(socket, null, (this.params.handleAuth ?? this.handleAuth))
} else if (socket.handshake.auth.token ?? socket.handshake.query.auth) {
await this.onAuth(socket, (socket.handshake.auth.token ?? socket.handshake.query.auth), (this.params.handleAuth ?? this.handleAuth))
}
}
onDisconnect = async (socket,) => { if (this.params.requireAuth) {
console.log(`[RTEngine] disconnect:client | id [${socket.id}]`) await this.onAuth(
socket,
null,
this.params.handleAuth ?? this.handleAuth,
)
} else if (socket.handshake.auth.token ?? socket.handshake.query.auth) {
await this.onAuth(
socket,
socket.handshake.auth.token ?? socket.handshake.query.auth,
this.params.handleAuth ?? this.handleAuth,
)
}
}
if (socket.eventBus.emit) { onDisconnect = async (socket) => {
socket.eventBus.emit("disconnect") console.log(`[RTEngine] disconnect:client | id [${socket.id}]`)
} else {
console.warn(`[${socket.id}][@${socket.userData.username}] Cannot emit disconnect event`)
}
const conn = await this.connections.get(socket.id) if (socket.eventBus.emit) {
socket.eventBus.emit("disconnect")
} else {
console.warn(
`[${socket.id}][@${socket.userData.username}] Cannot emit disconnect event`,
)
}
if (conn) { const conn = await this.connections.get(socket.id)
if (conn.user_id) {
await this.users.del(conn.user_id)
}
}
await this.connections.del(socket.id) if (conn) {
} if (conn.user_id) {
await this.users.del(conn.user_id)
}
}
onAuth = async (socket, token, handleAuth) => { await this.connections.del(socket.id)
if (typeof handleAuth !== "function") { }
console.log(`[RTEngine] [${socket.id}] No auth handler provided`)
return false
}
if (!token) { onAuth = async (socket, token, handleAuth) => {
if (socket.handshake.auth.token) { if (typeof handleAuth !== "function") {
token = socket.handshake.auth.token console.log(`[RTEngine] [${socket.id}] No auth handler provided`)
} return false
if (socket.handshake.query.auth) { }
token = socket.handshake.query.auth
}
}
function err(code, message) { if (!token) {
console.log(`[RTEngine] [${socket.id}] Auth error: ${code} >`, message) if (socket.handshake.auth.token) {
token = socket.handshake.auth.token
}
if (socket.handshake.query.auth) {
token = socket.handshake.query.auth
}
}
socket.emit("response:error", { function err(code, message) {
code, console.log(
message, `[RTEngine] [${socket.id}] Auth error: ${code} >`,
}) message,
)
socket.disconnect() socket.emit("response:error", {
code,
message,
})
return false socket.disconnect()
}
if (!token) { return false
return err(401, "auth:token_required") }
}
const authResult = await handleAuth(socket, token, err) if (!token) {
return err(401, "auth:token_required")
}
if (authResult) { const authResult = await handleAuth(socket, token, err)
const conn = await this.connections.has(socket.id)
// check if connection update is valid to avoid race condition(When user disconnect before auth verification is completed) if (authResult) {
if (!conn) { const conn = await this.connections.has(socket.id)
console.log(`Auth aborted`)
return false
}
this.users.set(authResult.user_id.toString(), { // check if connection update is valid to avoid race condition(When user disconnect before auth verification is completed)
socket_id: socket.id, if (!conn) {
...authResult, console.log(`Auth aborted`)
}) return false
}
socket.emit("response:auth:ok") this.users.set(authResult.user_id.toString(), {
socket_id: socket.id,
...authResult,
})
console.log(`[RTEngine] client:authenticated | socket_id [${socket.id}] | user_id [${authResult.user_id}] | username [@${authResult.username}]`) socket.emit("response:auth:ok")
}
}
eventHandler = async (fn, socket, payload) => { console.log(
try { `[RTEngine] client:authenticated | socket_id [${socket.id}] | user_id [${authResult.user_id}] | username [@${authResult.username}]`,
await fn(socket, payload, this) )
} catch (error) { }
console.error(error) }
if (typeof socket.emit === "function") { eventHandler = async (fn, socket, payload) => {
socket.emit("response:error", { try {
code: 500, await fn(socket, payload, this)
message: error.message, } catch (error) {
}) console.error(error)
}
}
}
find = { if (typeof socket.emit === "function") {
manyById: async (ids) => { socket.emit("response:error", {
if (typeof ids === "string") { code: 500,
ids = [ids] message: error.message,
} })
}
}
}
const users = await this.users.getMany(ids) find = {
manyById: async (ids) => {
if (typeof ids === "string") {
ids = [ids]
}
return users const users = await this.users.getMany(ids)
},
userBySocket: (socket_id) => {
}, return users
userById: async (user_id) => { },
const user = await this.users.get(user_id) userBySocket: (socket_id) => {},
userById: async (user_id) => {
const user = await this.users.get(user_id)
return user return user
}, },
socketByUserId: async (user_id) => { socketByUserId: async (user_id) => {
const user = await this.users.get(user_id) const user = await this.users.get(user_id)
if (!user) { if (!user) {
return null return null
} }
const socket = await this.connections.get(user.socket_id) const socket = await this.connections.get(user.socket_id)
return socket return socket
} },
} }
} }

View File

@ -2,140 +2,149 @@ import he from "hyper-express"
import rtengine from "../../classes/rtengine" import rtengine from "../../classes/rtengine"
export default class Engine { export default class Engine {
constructor(params) { constructor(params, ctx) {
this.params = params this.params = params
} this.ctx = ctx
}
app = null app = null
router = null router = null
ws = null ws = null
initialize = async (params) => { initialize = async (params) => {
const serverParams = { const serverParams = {
max_body_length: 50 * 1024 * 1024, //50MB in bytes, max_body_length: 50 * 1024 * 1024, //50MB in bytes,
} }
if (params.ssl) { if (params.ssl) {
serverParams.key_file_name = params.ssl?.key ?? null serverParams.key_file_name = params.ssl?.key ?? null
serverParams.cert_file_name = params.ssl?.cert ?? null serverParams.cert_file_name = params.ssl?.cert ?? null
} }
this.app = new he.Server(serverParams) this.app = new he.Server(serverParams)
this.router = new he.Router() this.router = new he.Router()
// create a router map // create a router map
if (typeof this.router.map !== "object") { if (typeof this.router.map !== "object") {
this.router.map = {} this.router.map = {}
} }
await this.router.any("*", (req, res) => { await this.router.any("*", (req, res) => {
return res.status(404).json({ return res.status(404).json({
code: 404, code: 404,
message: "Not found" message: "Not found",
}) })
}) })
await this.app.use(async (req, res, next) => { await this.app.use(async (req, res, next) => {
if (req.method === "OPTIONS") { if (req.method === "OPTIONS") {
// handle cors // handle cors
if (params.ignoreCors) { if (params.ignoreCors) {
res.setHeader("Access-Control-Allow-Methods", "*") res.setHeader("Access-Control-Allow-Methods", "*")
res.setHeader("Access-Control-Allow-Origin", "*") res.setHeader("Access-Control-Allow-Origin", "*")
res.setHeader("Access-Control-Allow-Headers", "*") res.setHeader("Access-Control-Allow-Headers", "*")
} }
return res.status(204).end() return res.status(204).end()
} }
// register body parser // register body parser
if (req.headers["content-type"]) { if (req.headers["content-type"]) {
if (!req.headers["content-type"].startsWith("multipart/form-data")) { if (
req.body = await req.urlencoded() !req.headers["content-type"].startsWith(
req.body = await req.json(req.body) "multipart/form-data",
} )
} ) {
}) req.body = await req.urlencoded()
req.body = await req.json(req.body)
}
}
})
if (params.enableWebsockets) { if (params.enableWebsockets) {
this.ws = global.websocket = new rtengine({ this.ws = global.websocket = new rtengine({
...params, ...params,
handleAuth: params.handleWsAuth, handleAuth: params.handleWsAuth,
root: `/${params.refName}` root: `/${params.refName}`,
}) })
this.ws.initialize() this.ws.initialize()
await this.ws.io.attachApp(this.app.uws_instance) await this.ws.io.attachApp(this.app.uws_instance)
} }
} }
listen = async (params) => { listen = async (params) => {
if (process.env.lb_service) { if (process.env.lb_service) {
let pathOverrides = Object.keys(this.router.map).map((key) => { let pathOverrides = Object.keys(this.router.map).map((key) => {
return key.split("/")[1] return key.split("/")[1]
}) })
// remove duplicates // remove duplicates
pathOverrides = [...new Set(pathOverrides)] pathOverrides = [...new Set(pathOverrides)]
// remove "" and _map // remove "" and _map
pathOverrides = pathOverrides.filter((key) => { pathOverrides = pathOverrides.filter((key) => {
if (key === "" || key === "_map") { if (key === "" || key === "_map") {
return false return false
} }
return true return true
}) })
if (params.enableWebsockets) { if (params.enableWebsockets) {
process.send({ process.send({
type: "router:ws:register", type: "router:ws:register",
id: process.env.lb_service.id, id: process.env.lb_service.id,
index: process.env.lb_service.index, index: process.env.lb_service.index,
data: { data: {
namespace: params.refName, namespace: params.refName,
listen: { listen: {
ip: this.params.listen_ip, ip: this.params.listen_ip,
port: this.params.listen_port, port: this.params.listen_port,
}, },
} },
}) })
} }
if (process.send) { if (process.send) {
// try to send router map to host // try to send router map to host
process.send({ process.send({
type: "router:register", type: "router:register",
id: process.env.lb_service.id, id: process.env.lb_service.id,
index: process.env.lb_service.index, index: process.env.lb_service.index,
data: { data: {
router_map: this.router.map, router_map: this.router.map,
path_overrides: pathOverrides, path_overrides: pathOverrides,
listen: { listen: {
ip: this.params.listen_ip, ip: this.params.listen_ip,
port: this.params.listen_port, port: this.params.listen_port,
}, },
} },
}) })
} }
} }
await this.app.listen(this.params.listen_port) await this.app.listen(this.params.listen_port)
} }
// close should be synchronous // close should be synchronous
close = () => { close = () => {
if (this.ws) { if (this.ws) {
this.ws.clear() this.ws.clear()
if (typeof this.ws?.close === "function") { if (typeof this.ws?.close === "function") {
this.ws.close() this.ws.close()
} }
} }
if (typeof this.app?.close === "function") { if (typeof this.app?.close === "function") {
this.app.close() this.app.close()
} }
}
if (typeof this.ctx.onClose === "function") {
this.ctx.onClose()
}
}
} }

View File

@ -14,280 +14,333 @@ import registerWebsocketsEvents from "./initializators/registerWebsocketsEvents"
import registerHttpRoutes from "./initializators/registerHttpRoutes" import registerHttpRoutes from "./initializators/registerHttpRoutes"
async function loadEngine(engine) { async function loadEngine(engine) {
const enginesPath = path.resolve(__dirname, "engines") const enginesPath = path.resolve(__dirname, "engines")
const selectedEnginePath = path.resolve(enginesPath, engine) const selectedEnginePath = path.resolve(enginesPath, engine)
if (!fs.existsSync(selectedEnginePath)) { if (!fs.existsSync(selectedEnginePath)) {
throw new Error(`Engine ${engine} not found!`) throw new Error(`Engine ${engine} not found!`)
} }
return require(selectedEnginePath).default return require(selectedEnginePath).default
} }
class Server { class Server {
constructor(params = {}, controllers = {}, middlewares = {}, headers = {}) { constructor(params = {}, controllers = {}, middlewares = {}, headers = {}) {
this.isExperimental = defaults.isExperimental ?? false this.isExperimental = defaults.isExperimental ?? false
if (this.isExperimental) { if (this.isExperimental) {
console.warn("\n🚧 This version of Linebridge is experimental! 🚧") console.warn("\n🚧 This version of Linebridge is experimental! 🚧")
console.warn(`Version: ${defaults.version}\n`) console.warn(`Version: ${defaults.version}\n`)
} }
this.params = { this.params = {
...defaults.params, ...defaults.params,
...params.default ?? params, ...(params.default ?? params),
} }
this.controllers = { this.controllers = {
...controllers.default ?? controllers, ...(controllers.default ?? controllers),
} }
this.middlewares = { this.middlewares = {
...middlewares.default ?? middlewares, ...(middlewares.default ?? middlewares),
} }
this.headers = { this.headers = {
...defaults.headers, ...defaults.headers,
...headers.default ?? headers, ...(headers.default ?? headers),
} }
// fix and fulfill params // fix and fulfill params
this.params.useMiddlewares = this.params.useMiddlewares ?? [] this.params.useMiddlewares = this.params.useMiddlewares ?? []
this.params.name = this.constructor.refName ?? this.params.refName
this.params.useEngine = this.constructor.useEngine ?? this.params.useEngine ?? "hyper-express"
this.params.listen_ip = this.constructor.listenIp ?? this.constructor.listen_ip ?? this.params.listen_ip ?? "0.0.0.0"
this.params.listen_port = this.constructor.listenPort ?? this.constructor.listen_port ?? this.params.listen_port ?? 3000
this.params.http_protocol = this.params.http_protocol ?? "http"
this.params.http_address = `${this.params.http_protocol}://${defaults.localhost_address}:${this.params.listen_port}`
this.params.enableWebsockets = this.constructor.enableWebsockets ?? this.params.enableWebsockets ?? false
this.params.ignoreCors = this.constructor.ignoreCors ?? this.params.ignoreCors ?? true
this.params.routesPath = this.constructor.routesPath ?? this.params.routesPath ?? path.resolve(process.cwd(), "routes") this.params.name = this.constructor.refName ?? this.params.refName
this.params.wsRoutesPath = this.constructor.wsRoutesPath ?? this.params.wsRoutesPath ?? path.resolve(process.cwd(), "routes_ws")
globalThis._linebridge = { this.params.useEngine =
name: this.params.name, this.constructor.useEngine ??
useEngine: this.params.useEngine, this.params.useEngine ??
listenIp: this.params.listen_ip, "hyper-express"
listenPort: this.params.listen_port,
httpProtocol: this.params.http_protocol,
httpAddress: this.params.http_address,
enableWebsockets: this.params.enableWebsockets,
ignoreCors: this.params.ignoreCors,
routesPath: this.params.routesPath,
validHttpMethods: defaults.valid_http_methods,
}
return this this.params.listen_ip =
} this.constructor.listenIp ??
this.constructor.listen_ip ??
this.params.listen_ip ??
"0.0.0.0"
engine = null this.params.listen_port =
this.constructor.listenPort ??
this.constructor.listen_port ??
this.params.listen_port ??
3000
events = null this.params.http_protocol = this.params.http_protocol ?? "http"
ipc = null this.params.http_address = `${this.params.http_protocol}://${defaults.localhost_address}:${this.params.listen_port}`
ipcEvents = null this.params.enableWebsockets =
this.constructor.enableWebsockets ??
this.params.enableWebsockets ??
false
eventBus = new EventEmitter() this.params.ignoreCors =
this.constructor.ignoreCors ?? this.params.ignoreCors ?? true
initialize = async () => { this.params.disableBaseEndpoints =
const startHrTime = process.hrtime() this.constructor.disableBaseEndpoints ??
this.params.disableBaseEndpoints ??
false
// register events this.params.routesPath =
if (this.events) { this.constructor.routesPath ??
if (this.events.default) { this.params.routesPath ??
this.events = this.events.default path.resolve(process.cwd(), "routes")
}
for (const [eventName, eventHandler] of Object.entries(this.events)) { this.params.wsRoutesPath =
this.eventBus.on(eventName, eventHandler) this.constructor.wsRoutesPath ??
} this.params.wsRoutesPath ??
} path.resolve(process.cwd(), "routes_ws")
const engineParams = { globalThis._linebridge = {
...this.params, name: this.params.name,
handleWsAuth: this.handleWsAuth, useEngine: this.params.useEngine,
handleAuth: this.handleHttpAuth, listenIp: this.params.listen_ip,
requireAuth: this.constructor.requireHttpAuth, listenPort: this.params.listen_port,
refName: this.constructor.refName ?? this.params.refName, httpProtocol: this.params.http_protocol,
ssl: this.ssl, httpAddress: this.params.http_address,
} enableWebsockets: this.params.enableWebsockets,
ignoreCors: this.params.ignoreCors,
routesPath: this.params.routesPath,
validHttpMethods: defaults.valid_http_methods,
}
// initialize engine return this
this.engine = await loadEngine(this.params.useEngine) }
this.engine = new this.engine(engineParams) engine = null
if (typeof this.engine.initialize === "function") { events = null
await this.engine.initialize(engineParams)
}
// check if ws events are defined ipc = null
if (typeof this.wsEvents !== "undefined") {
if (!this.engine.ws) {
console.warn("`wsEvents` detected, but Websockets are not enabled! Ignoring...")
} else {
for (const [eventName, eventHandler] of Object.entries(this.wsEvents)) {
this.engine.ws.events.set(eventName, eventHandler)
}
}
}
// try to execute onInitialize hook ipcEvents = null
if (typeof this.onInitialize === "function") {
try {
await this.onInitialize()
}
catch (err) {
console.error(err)
process.exit(1)
}
}
// set defaults eventBus = new EventEmitter()
this.useDefaultHeaders()
this.useDefaultMiddlewares()
if (this.routes) { initialize = async () => {
for (const [route, endpoint] of Object.entries(this.routes)) { const startHrTime = process.hrtime()
this.engine.router.map[route] = new Endpoint(
this,
{
...endpoint,
route: route,
handlers: {
[endpoint.method]: endpoint.fn,
},
}
)
}
}
// register http & ws routes // register events
this.engine = await registerHttpRoutes(this.params.routesPath, this.engine, this) if (this.events) {
this.engine = await registerWebsocketsEvents(this.params.wsRoutesPath, this.engine) if (this.events.default) {
this.events = this.events.default
}
// register base endpoints if enabled for (const [eventName, eventHandler] of Object.entries(
if (!this.params.disableBaseEndpoint) { this.events,
await registerBaseEndpoints(this) )) {
} this.eventBus.on(eventName, eventHandler)
}
}
// use main router const engineParams = {
await this.engine.app.use(this.engine.router) ...this.params,
handleWsAuth: this.handleWsAuth,
handleAuth: this.handleHttpAuth,
requireAuth: this.constructor.requireHttpAuth,
refName: this.constructor.refName ?? this.params.refName,
ssl: this.ssl,
}
// if is a linebridge service then initialize IPC Channels // initialize engine
if (process.env.lb_service) { this.engine = await loadEngine(this.params.useEngine)
await this.initializeIpc()
}
// try to execute beforeInitialize hook. this.engine = new this.engine(engineParams, this)
if (typeof this.afterInitialize === "function") {
await this.afterInitialize()
}
// listen if (typeof this.engine.initialize === "function") {
await this.engine.listen(engineParams) await this.engine.initialize(engineParams)
}
// calculate elapsed time on ms, to fixed 2 // check if ws events are defined
const elapsedHrTime = process.hrtime(startHrTime) if (typeof this.wsEvents !== "undefined") {
const elapsedTimeInMs = elapsedHrTime[0] * 1e3 + elapsedHrTime[1] / 1e6 if (!this.engine.ws) {
console.warn(
"`wsEvents` detected, but Websockets are not enabled! Ignoring...",
)
} else {
for (const [eventName, eventHandler] of Object.entries(
this.wsEvents,
)) {
this.engine.ws.events.set(eventName, eventHandler)
}
}
}
console.info(`🛰 Server ready!\n\t - ${this.params.http_protocol}://${this.params.listen_ip}:${this.params.listen_port} \n\t - Tooks ${elapsedTimeInMs.toFixed(2)}ms`) // try to execute onInitialize hook
} if (typeof this.onInitialize === "function") {
try {
await this.onInitialize()
} catch (err) {
console.error(err)
process.exit(1)
}
}
initializeIpc = async () => { // set defaults
console.info("🚄 Starting IPC client") this.useDefaultHeaders()
this.useDefaultMiddlewares()
this.ipc = global.ipc = new IPCClient(this, process) if (this.routes) {
} for (const [route, endpoint] of Object.entries(this.routes)) {
this.engine.router.map[route] = new Endpoint(this, {
...endpoint,
route: route,
handlers: {
[endpoint.method]: endpoint.fn,
},
})
}
}
useDefaultHeaders = () => { // register http & ws routes
this.engine.app.use((req, res, next) => { this.engine = await registerHttpRoutes(
Object.keys(this.headers).forEach((key) => { this.params.routesPath,
res.setHeader(key, this.headers[key]) this.engine,
}) this,
)
this.engine = await registerWebsocketsEvents(
this.params.wsRoutesPath,
this.engine,
)
next() // register base endpoints if enabled
}) if (!this.params.disableBaseEndpoints) {
} await registerBaseEndpoints(this)
}
useDefaultMiddlewares = async () => { // use main router
const middlewares = await this.resolveMiddlewares([ await this.engine.app.use(this.engine.router)
...this.params.useMiddlewares,
...this.useMiddlewares ?? [],
...defaults.useMiddlewares,
])
middlewares.forEach((middleware) => { // if is a linebridge service then initialize IPC Channels
this.engine.app.use(middleware) if (process.env.lb_service) {
}) await this.initializeIpc()
} }
register = { // try to execute beforeInitialize hook.
http: (endpoint, ..._middlewares) => { if (typeof this.afterInitialize === "function") {
// check and fix method await this.afterInitialize()
endpoint.method = endpoint.method?.toLowerCase() ?? "get" }
if (defaults.fixed_http_methods[endpoint.method]) { // listen
endpoint.method = defaults.fixed_http_methods[endpoint.method] await this.engine.listen(engineParams)
}
// check if method is supported // calculate elapsed time on ms, to fixed 2
if (typeof this.engine.router[endpoint.method] !== "function") { const elapsedHrTime = process.hrtime(startHrTime)
throw new Error(`Method [${endpoint.method}] is not supported!`) const elapsedTimeInMs = elapsedHrTime[0] * 1e3 + elapsedHrTime[1] / 1e6
}
// grab the middlewares console.info(
let middlewares = [..._middlewares] `🛰 Server ready!\n\t - ${this.params.http_protocol}://${this.params.listen_ip}:${this.params.listen_port} \n\t - Tooks ${elapsedTimeInMs.toFixed(2)}ms`,
)
}
if (endpoint.middlewares) { initializeIpc = async () => {
if (!Array.isArray(endpoint.middlewares)) { console.info("🚄 Starting IPC client")
endpoint.middlewares = [endpoint.middlewares]
}
middlewares = [...middlewares, ...this.resolveMiddlewares(endpoint.middlewares)] this.ipc = global.ipc = new IPCClient(this, process)
} }
this.engine.router.map[endpoint.route] = { useDefaultHeaders = () => {
method: endpoint.method, this.engine.app.use((req, res, next) => {
path: endpoint.route, Object.keys(this.headers).forEach((key) => {
} res.setHeader(key, this.headers[key])
})
// register endpoint to http interface router next()
this.engine.router[endpoint.method](endpoint.route, ...middlewares, endpoint.fn) })
}, }
}
resolveMiddlewares = (requestedMiddlewares) => { useDefaultMiddlewares = async () => {
const middlewares = { const middlewares = await this.resolveMiddlewares([
...this.middlewares, ...this.params.useMiddlewares,
...defaults.middlewares, ...(this.useMiddlewares ?? []),
} ...defaults.useMiddlewares,
])
if (typeof requestedMiddlewares === "string") { middlewares.forEach((middleware) => {
requestedMiddlewares = [requestedMiddlewares] this.engine.app.use(middleware)
} })
}
const execs = [] register = {
http: (endpoint, ..._middlewares) => {
// check and fix method
endpoint.method = endpoint.method?.toLowerCase() ?? "get"
requestedMiddlewares.forEach((middlewareKey) => { if (defaults.fixed_http_methods[endpoint.method]) {
if (typeof middlewareKey === "string") { endpoint.method = defaults.fixed_http_methods[endpoint.method]
if (typeof middlewares[middlewareKey] !== "function") { }
throw new Error(`Middleware ${middlewareKey} not found!`)
}
execs.push(middlewares[middlewareKey]) // check if method is supported
} if (typeof this.engine.router[endpoint.method] !== "function") {
throw new Error(`Method [${endpoint.method}] is not supported!`)
}
if (typeof middlewareKey === "function") { // grab the middlewares
execs.push(middlewareKey) let middlewares = [..._middlewares]
}
})
return execs if (endpoint.middlewares) {
} if (!Array.isArray(endpoint.middlewares)) {
endpoint.middlewares = [endpoint.middlewares]
}
middlewares = [
...middlewares,
...this.resolveMiddlewares(endpoint.middlewares),
]
}
this.engine.router.map[endpoint.route] = {
method: endpoint.method,
path: endpoint.route,
}
// register endpoint to http interface router
this.engine.router[endpoint.method](
endpoint.route,
...middlewares,
endpoint.fn,
)
},
}
resolveMiddlewares = (requestedMiddlewares) => {
const middlewares = {
...this.middlewares,
...defaults.middlewares,
}
if (typeof requestedMiddlewares === "string") {
requestedMiddlewares = [requestedMiddlewares]
}
const execs = []
requestedMiddlewares.forEach((middlewareKey) => {
if (typeof middlewareKey === "string") {
if (typeof middlewares[middlewareKey] !== "function") {
throw new Error(`Middleware ${middlewareKey} not found!`)
}
execs.push(middlewares[middlewareKey])
}
if (typeof middlewareKey === "function") {
execs.push(middlewareKey)
}
})
return execs
}
} }
module.exports = Server module.exports = Server