update to 0.16.0

This commit is contained in:
SrGooglo 2023-11-28 18:43:18 +00:00
parent 75251be768
commit 13b01b21d6
9 changed files with 779 additions and 129 deletions

View File

@ -1,4 +0,0 @@
{
"version": "0.15.12",
"fixedMainScript": "./client/index.js"
}

123
bootstrap.js vendored Normal file
View File

@ -0,0 +1,123 @@
require("dotenv").config()
const path = require("path")
const { webcrypto: crypto } = require("crypto")
const infisical = require("infisical-node")
const { registerBaseAliases } = require("./dist/server")
const EventEmitter = require("./dist/lib/event_emitter").default
global.isProduction = process.env.NODE_ENV === "production"
globalThis["__root"] = path.resolve(process.cwd())
globalThis["__src"] = path.resolve(globalThis["__root"], global.isProduction ? "dist" : "src")
const customAliases = {
"root": globalThis["__root"],
"src": globalThis["__src"],
"@shared-classes": path.resolve(globalThis["__src"], "_shared/classes"),
"@services": path.resolve(globalThis["__src"], "services"),
}
if (!global.isProduction) {
customAliases["comty.js"] = path.resolve(globalThis["__src"], "../../comty.js/src")
customAliases["@shared-classes"] = path.resolve(globalThis["__src"], "shared-classes")
}
if (process.env.USE_LINKED_SHARED) {
customAliases["@shared-classes"] = path.resolve(globalThis["__src"], "shared-classes")
}
registerBaseAliases(globalThis["__src"], customAliases)
// patches
const { Buffer } = require("buffer")
global.b64Decode = (data) => {
return Buffer.from(data, "base64").toString("utf-8")
}
global.b64Encode = (data) => {
return Buffer.from(data, "utf-8").toString("base64")
}
global.nanoid = (t = 21) => crypto.getRandomValues(new Uint8Array(t)).reduce(((t, e) => t += (e &= 63) < 36 ? e.toString(36) : e < 62 ? (e - 26).toString(36).toUpperCase() : e > 62 ? "-" : "_"), "");
global.eventBus = new EventEmitter()
Array.prototype.updateFromObjectKeys = function (obj) {
this.forEach((value, index) => {
if (obj[value] !== undefined) {
this[index] = obj[value]
}
})
return this
}
global.toBoolean = (value) => {
if (typeof value === "boolean") {
return value
}
if (typeof value === "string") {
return value.toLowerCase() === "true"
}
return false
}
async function injectEnvFromInfisical() {
const envMode = global.FORCE_ENV ?? global.isProduction ? "prod" : "dev"
console.log(`🔑 Injecting env variables from INFISICAL in [${envMode}] mode...`)
const client = new infisical({
token: process.env.INFISICAL_TOKEN,
})
const secrets = await client.getAllSecrets({
path: process.env.INFISICAL_PATH ?? "/",
environment: envMode,
attachToProcessEnv: false,
})
// inject to process.env
secrets.forEach((secret) => {
if (!(process.env[secret.secretName])) {
process.env[secret.secretName] = secret.secretValue
}
})
}
async function handleExit(code, e) {
if (code !== 0) {
console.log(`🚫 Unexpected exit >`, code, e)
}
await global.eventBus.awaitEmit("exit", code)
return process.exit(code)
}
async function main(api) {
if (!api) {
throw new Error("API is not defined")
}
if (process.env.INFISICAL_TOKEN) {
await injectEnvFromInfisical()
}
const instance = new api()
process.on("exit", handleExit)
process.on("SIGINT", handleExit)
process.on("uncaughtException", handleExit)
process.on("unhandledRejection", handleExit)
await instance.initialize()
return instance
}
module.exports = main

View File

@ -18,17 +18,24 @@
],
"license": "MIT",
"dependencies": {
"@foxify/events": "^2.1.0",
"@socket.io/cluster-adapter": "^0.2.2",
"@socket.io/redis-adapter": "^8.2.1",
"@socket.io/redis-emitter": "^5.1.0",
"@socket.io/sticky": "^1.0.4",
"axios": "^1.5.1",
"axios-retry": "3.4.0",
"cors": "2.8.5",
"express": "4.18.2",
"hyper-express": "6.5.5",
"infisical-node": "^1.5.0",
"ioredis": "^5.3.2",
"md5": "2.3.0",
"module-alias": "2.2.2",
"morgan": "1.10.0",
"socket.io": "4.5.4",
"socket.io": "^4.7.2",
"socket.io-client": "4.5.4",
"uuid": "9.0.0"
"uuid": "3.4.0"
},
"devDependencies": {
"@corenode/utils": "0.28.26",

View File

@ -0,0 +1,296 @@
import socketio from "socket.io"
import redis from "ioredis"
import EventEmitter from "@foxify/events"
import { createAdapter as createRedisAdapter } from "@socket.io/redis-adapter"
import { createAdapter as createClusterAdapter } from "@socket.io/cluster-adapter"
import { setupWorker } from "@socket.io/sticky"
import { Emitter } from "@socket.io/redis-emitter"
import http from "node:http"
import cluster from "node:cluster"
import RedisMap from "../../lib/redis_map"
export default class RTEngineServer {
constructor(params = {}) {
this.params = params
// servers
this.http = this.params.http ?? undefined
this.io = this.params.io ?? undefined
this.redis = this.params.redis ?? undefined
this.redisEmitter = null
this.clusterMode = !!cluster.isWorker
this.connections = null
this.users = null
}
onConnect = async (socket) => {
console.log(`🤝 New client connected on socket id [${socket.id}]`)
socket.eventEmitter = new EventEmitter()
if (typeof this.events === "object") {
for (const event in this.events) {
socket.on(event, (...args) => {
this.eventHandler(this.events[event], socket, ...args)
})
}
}
socket.on("disconnect", (_socket) => {
this.eventHandler(this.onDisconnect, socket)
})
const conn_obj = {
id: socket.id,
}
if (this.clusterMode) {
conn_obj.worker_id = cluster.worker.id
conn_obj._remote = true
this.redisEmitter.serverSideEmit(`redis:conn:set`, conn_obj)
}
await this.connections.set(conn_obj.id, conn_obj)
console.log(`⚙️ Awaiting authentication for client [${socket.id}]`)
if (this.params.requireAuth) {
await this.authenticateClient(socket, null, this.handleAuth ?? this.params.handleAuth)
} else if (socket.handshake.auth.token) {
await this.authenticateClient(socket, socket.handshake.auth.token, this.handleAuth ?? this.params.handleAuth)
}
if (process.env.NODE_ENV === "development") {
const connected_size = await this.connections.size()
console.log(`Total connected clients: ${connected_size}`)
}
}
onDisconnect = async (socket,) => {
console.log(`👋 Client disconnected on socket id [${socket.id}]`)
if (socket.eventEmitter.emit) {
socket.eventEmitter.emit("disconnect")
} else {
console.warn(`[${socket.id}][@${socket.userData.username}] Cannot emit disconnect event`)
}
const conn = await this.connections.get(socket.id)
if (conn) {
if (conn.user_id) {
await this.users.del(conn.user_id)
}
}
await this.connections.del(socket.id)
const connected_size = await this.connections.size()
console.log(`Total connected clients: ${connected_size}`)
}
authenticateClient = async (socket, token, handleAuth) => {
if (typeof handleAuth !== "function") {
console.warn(`Skipping authentication for client [${socket.id}] due no auth handler provided`)
return false
}
if (!token) {
if (socket.handshake.auth.token) {
token = socket.handshake.auth.token
}
}
function err(code, message) {
console.error(`🛑 Disconecting client [${socket.id}] cause an auth error >`, code, message)
socket.emit("response:error", {
code,
message,
})
socket.disconnect()
return false
}
if (!token) {
return err(401, "auth:token_required")
}
const authResult = await handleAuth(socket, token, err)
if (authResult) {
const conn = await this.connections.update(socket.id, authResult)
// check if connection update is valid to avoid race condition(When user disconnect before auth verification is completed)
if (!conn) {
console.log(`Auth aborted`)
return false
}
this.users.set(authResult.user_id, {
socket_id: socket.id,
...authResult,
})
socket.emit("response:auth:ok")
console.log(`✅ Authenticated client [${socket.id}] as [@${authResult.username}]`)
}
}
find = {
manyById: async (ids) => {
if (typeof ids === "string") {
ids = [ids]
}
const users = await this.users.getMany(ids)
return users
},
userBySocket: (socket_id) => {
},
userById: async (user_id) => {
const user = await this.users.get(user_id)
console.log(user)
return user
}
}
eventHandler = async (fn, socket, ...args) => {
try {
await fn(socket, ...args)
} catch (error) {
console.error(error)
if (socket.emit) {
socket.emit("response:error", {
code: 500,
message: error.message,
})
}
}
}
registerBaseEndpoints = (socket) => {
if (!socket) {
return socket
}
socket.on("ping", () => {
socket.emit("pong")
})
return socket
}
async initialize() {
console.log("🌐 Initializing RTEngine server...")
process.on("exit", this.cleanUp)
process.on("SIGINT", this.cleanUp)
process.on("SIGTERM", this.cleanUp)
process.on("SIGBREAK", this.cleanUp)
process.on("SIGHUP", this.cleanUp)
// create default servers
if (typeof this.redis === "undefined") {
this.redis = new redis(process.env.REDIS_HOST, process.env.REDIS_PORT, {
password: process.env.REDIS_PASSWORD,
db: process.env.REDIS_DB,
})
}
if (typeof this.http === "undefined") {
this.http = http.createServer()
}
if (typeof this.io === "undefined") {
this.io = new socketio.Server(this.http, {
cors: {
origin: "*",
methods: ["GET", "POST"],
credentials: true,
},
})
}
// create mappers
this.connections = new RedisMap(this.redis, {
refKey: "connections",
})
this.users = new RedisMap(this.redis, {
refKey: "users",
})
// setup clustered mode
if (this.clusterMode) {
console.log(`Connecting to redis as cluster worker id [${cluster.worker.id}]`)
this.io.adapter(createClusterAdapter())
const subClient = this.redis.duplicate()
this.io.adapter(createRedisAdapter(this.redis, subClient))
setupWorker(this.io)
this.redisEmitter = new Emitter(this.redis)
}
// WARN: Do not flush connections pls
if (process.env.NODE_ENV !== "production") {
console.log(`Flushing previus connections... (Only for dev mode)`)
await this.connections.flush()
}
// register middlewares
if (typeof this.middlewares === "object" && Array.isArray(this.middlewares)) {
for (const middleware of this.middlewares) {
this.io.use(middleware)
}
}
for (const event in this._redisEvents) {
this.io.on(event, this._redisEvents[event])
}
this.io.on("connection", (socket) => {
this.registerBaseEndpoints(socket)
this.eventHandler(this.onConnect, socket)
})
if (typeof this.onInit === "function") {
await this.onInit()
}
console.log(`✅ RTEngine server is running on port [${process.env.LISTEN_PORT}] ${this.clusterMode ? `on clustered mode [${cluster.worker.id}]` : ""}`)
return true
}
cleanUp = async () => {
console.log(`Cleaning up RTEngine server...`)
this.connections.flush(cluster.worker.id)
if (this.io) {
this.io.close()
}
}
}

View File

@ -54,6 +54,38 @@ if (process.env.LOG_REQUESTS === "true") {
global.DEFAULT_MIDDLEWARES.push(require("morgan")(process.env.MORGAN_FORMAT ?? ":method :url :status - :response-time ms"))
}
// patches
const { Buffer } = require("buffer")
global.b64Decode = (data) => {
return Buffer.from(data, "base64").toString("utf-8")
}
global.b64Encode = (data) => {
return Buffer.from(data, "utf-8").toString("base64")
}
Array.prototype.updateFromObjectKeys = function (obj) {
this.forEach((value, index) => {
if (obj[value] !== undefined) {
this[index] = obj[value]
}
})
return this
}
global.toBoolean = (value) => {
if (typeof value === "boolean") {
return value
}
if (typeof value === "string") {
return value.toLowerCase() === "true"
}
return false
}
function registerBaseAliases(fromPath, customAliases = {}) {
if (typeof fromPath === "undefined") {
if (module.parent.filename.includes("dist")) {
@ -79,4 +111,5 @@ module.exports = {
Server: require("./server.js"),
Controller: require("./classes/controller"),
Endpoint: require("./classes/endpoint"),
version: require("../../package.json").version,
}

View File

@ -0,0 +1,223 @@
export default class RedisMap {
constructor(redis, params = {}) {
if (!redis) {
throw new Error("redis client is required")
}
this.redis = redis
this.params = params
this.refKey = this.params.refKey
if (!this.refKey) {
throw new Error("refKey is required")
}
}
set = async (key, value) => {
if (!key) {
console.warn(`[redis:${this.refKey}] Failed to set entry with no key`)
return
}
if (!value) {
console.warn(`[redis:${this.refKey}] Failed to set entry [${key}] with no value`)
return
}
const redisKey = `${this.refKey}:${key}`
//console.log(`[redis:${this.refKey}] Setting entry [${key}]`,)
await this.redis.hset(redisKey, value)
return value
}
get = async (key, value) => {
if (!key) {
console.warn(`[redis:${this.refKey}] Failed to get entry with no key`)
return
}
const redisKey = `${this.refKey}:${key}`
let result = null
if (value) {
result = await this.redis.hget(redisKey, value)
} else {
result = await this.redis.hgetall(redisKey)
}
if (Object.keys(result).length === 0) {
result = null
}
return result
}
getMany = async (keys) => {
if (!keys) {
console.warn(`[redis:${this.refKey}] Failed to get entry with no key`)
return
}
const redisKeys = keys.map((key) => `${this.refKey}:${key}`)
const pipeline = this.redis.pipeline()
for (const redisKey of redisKeys) {
pipeline.hgetall(redisKey)
}
let results = await pipeline.exec()
results = results.map((result) => {
return result[1]
})
// delete null or empty objects
results = results.filter((result) => {
if (result === null) {
return false
}
if (Object.keys(result).length === 0) {
return false
}
return true
})
return results
}
del = async (key) => {
if (!key) {
console.warn(`[redis:${this.refKey}] Failed to delete entry with no key`)
return false
}
const redisKey = `${this.refKey}:${key}`
const data = await this.get(key)
if (!data) {
return false
}
await this.redis.hdel(redisKey, Object.keys(data))
return true
}
getAll = async () => {
let map = []
let nextIndex = 0
do {
const [nextIndexAsStr, results] = await this.redis.scan(
nextIndex,
"MATCH",
`${this.refKey}:*`,
"COUNT",
100
)
nextIndex = parseInt(nextIndexAsStr, 10)
map = map.concat(results)
} while (nextIndex !== 0)
return map
}
update = async (key, data) => {
if (!key) {
console.warn(`[redis:${this.refKey}] Failed to update entry with no key`)
return
}
const redisKey = `${this.refKey}:${key}`
let new_data = await this.get(key)
if (!new_data) {
console.warn(`[redis:${this.refKey}] Object [${key}] not exist, nothing to update`)
return false
}
new_data = {
...new_data,
...data,
}
await this.redis.hset(redisKey, new_data)
return new_data
}
flush = async (worker_id) => {
let nextIndex = 0
do {
const [nextIndexAsStr, results] = await this.redis.scan(
nextIndex,
"MATCH",
`${this.refKey}:*`,
"COUNT",
100
)
nextIndex = parseInt(nextIndexAsStr, 10)
const pipeline = this.redis.pipeline()
for await (const key of results) {
const key_id = key.split(this.refKey + ":")[1]
const data = await this.get(key_id)
if (!data) {
continue
}
if (worker_id) {
if (data.worker_id !== worker_id) {
continue
}
}
pipeline.hdel(key, Object.keys(data))
}
await pipeline.exec()
} while (nextIndex !== 0)
}
size = async () => {
let count = 0
let nextIndex = 0
do {
const [nextIndexAsStr, results] = await this.redis.scan(
nextIndex,
"MATCH",
`${this.refKey}:*`,
"COUNT",
100
)
nextIndex = parseInt(nextIndexAsStr, 10)
count = count + results.length
} while (nextIndex !== 0)
return count
}
}

View File

@ -1,28 +1,43 @@
const fs = require("fs")
const path = require("path")
const http = require("http")
const https = require("https")
const io = require("socket.io")
const pkgjson = require(path.resolve(process.cwd(), "package.json"))
const rtengine = require("./classes/RTEngine").default
const tokenizer = require("corenode/libs/tokenizer")
const { serverManifest, internalConsole } = require("./lib")
const HTTPProtocolsInstances = {
http: http,
https: https,
}
const pkgjson = require(path.resolve(process.cwd(), "package.json"))
const HTTPEngines = {
const Engines = {
"hyper-express": () => {
console.warn("HyperExpress is not fully supported yet!")
const engine = require("hyper-express")
return new engine.Server()
},
"express": () => {
return require("express")()
"express": (params) => {
const { createServer } = require("node:http")
const express = require("express")
const socketio = require("socket.io")
const app = express()
const http = createServer(app)
const io = new socketio.Server(http)
const ws = new rtengine({
...params,
io: io,
http: false,
})
app.use(express.json())
app.use(express.urlencoded({ extended: true }))
return {
ws,
http,
app,
}
},
}
@ -51,30 +66,11 @@ class Server {
// fix and fulfill params
this.params.listen_ip = this.params.listen_ip ?? "0.0.0.0"
this.params.listen_port = this.params.listen_port ?? 3000
this.params.listen_port = this.constructor.listen_port ?? this.params.listen_port ?? 3000
this.params.http_protocol = this.params.http_protocol ?? "http"
this.params.ws_protocol = this.params.ws_protocol ?? "ws"
this.params.http_address = `${this.params.http_protocol}://${global.LOCALHOST_ADDRESS}:${this.params.listen_port}`
this.params.ws_address = `${this.params.ws_protocol}://${global.LOCALHOST_ADDRESS}:${this.params.listen_port}`
// check if engine is supported
if (typeof HTTPProtocolsInstances[this.params.http_protocol]?.createServer !== "function") {
throw new Error("Invalid HTTP protocol (Missing createServer function)")
}
// create instances the 3 main instances of the server (Engine, HTTP, WebSocket)
this.engine_instance = global.engine_instance = HTTPEngines[this.params.engine]()
this.http_instance = global.http_instance = HTTPProtocolsInstances[this.params.http_protocol].createServer({
...this.params.httpOptions ?? {},
}, this.engine_instance)
this.websocket_instance = global.websocket_instance = {
io: new io.Server(this.http_instance),
map: {},
eventsChannels: [],
}
this.engine = null
this.InternalConsole = new internalConsole({
server_name: this.params.name
@ -94,13 +90,52 @@ class Server {
}
}
// handle exit events
process.on("SIGTERM", this.cleanupProcess)
process.on("SIGINT", this.cleanupProcess)
return this
}
initialize = async () => {
if (!this.params.minimal) {
this.InternalConsole.info(`🚀 Starting server...`)
}
// initialize engine
this.engine = global.engine = Engines[this.params.engine]({
...this.params,
handleAuth: this.handleWsAuth,
requireAuth: this.constructor.requireWSAuth,
})
if (typeof this.onInitialize === "function") {
await this.onInitialize()
}
//* set server defined headers
this.initializeHeaders()
//* set server defined middlewares
this.initializeRequiredMiddlewares()
//* register controllers
await this.initializeControllers()
//* register main index endpoint `/`
await this.registerBaseEndpoints()
if (typeof this.engine.ws?.initialize !== "function") {
console.warn("❌ WebSocket is not supported!")
} else {
await this.engine.ws.initialize()
}
await this.engine.http.listen(this.params.listen_port)
this.InternalConsole.info(`✅ Server ready on => ${this.params.listen_ip}:${this.params.listen_port}`)
if (!this.params.minimal) {
this.outputServerInfo()
}
}
initializeManifest = () => {
// check if origin.server exists
if (!fs.existsSync(serverManifest.filepath)) {
@ -127,38 +162,8 @@ class Server {
serverManifest.write({ last_start: Date.now() })
}
initialize = async () => {
if (!this.params.minimal) {
this.InternalConsole.info(`🚀 Starting server...`)
}
//* set server defined headers
this.initializeHeaders()
//* set server defined middlewares
this.initializeRequiredMiddlewares()
//* register controllers
await this.initializeControllers()
//* register main index endpoint `/`
await this.registerBaseEndpoints()
// initialize main socket
this.websocket_instance.io.on("connection", this.handleWSClientConnection)
// initialize http server
await this.http_instance.listen(this.params.listen_port, this.params.listen_ip ?? "0.0.0.0", () => {
this.InternalConsole.info(`✅ Server ready on => ${this.params.listen_ip}:${this.params.listen_port}`)
if (!this.params.minimal) {
this.outputServerInfo()
}
})
}
initializeHeaders = () => {
this.engine_instance.use((req, res, next) => {
this.engine.app.use((req, res, next) => {
Object.keys(this.headers).forEach((key) => {
res.setHeader(key, this.headers[key])
})
@ -172,7 +177,7 @@ class Server {
useMiddlewares.forEach((middleware) => {
if (typeof middleware === "function") {
this.engine_instance.use(middleware)
this.engine.app.use(middleware)
}
})
}
@ -201,9 +206,9 @@ class Server {
this.registerHTTPEndpoint(endpoint, ...this.resolveMiddlewares(controller.useMiddlewares))
})
WSEndpoints.forEach((endpoint) => {
this.registerWSEndpoint(endpoint)
})
// WSEndpoints.forEach((endpoint) => {
// this.registerWSEndpoint(endpoint)
// })
} catch (error) {
if (!global.silentOutputServerErrors) {
this.InternalConsole.error(`\n\x1b[41m\x1b[37m🆘 [${controller.refName ?? controller.name}] Controller initialization failed:\x1b[0m ${error.stack} \n`)
@ -221,7 +226,7 @@ class Server {
}
// check if method is supported
if (typeof this.engine_instance[endpoint.method] !== "function") {
if (typeof this.engine.app[endpoint.method] !== "function") {
throw new Error(`Method [${endpoint.method}] is not supported!`)
}
@ -241,7 +246,7 @@ class Server {
const routeModel = [endpoint.route, ...middlewares, this.createHTTPRequestHandler(endpoint)]
// register endpoint to http interface router
this.engine_instance[endpoint.method](...routeModel)
this.engine.app[endpoint.method](...routeModel)
// extend to map
this.endpoints_map[endpoint.method] = {
@ -270,8 +275,6 @@ class Server {
return false
}
//* register main index endpoint `/`
// this is the default endpoint, should return the server info and the map of all endpoints (http & ws)
this.registerHTTPEndpoint({
method: "get",
route: "/",
@ -281,8 +284,16 @@ class Server {
version: pkgjson.version ?? "unknown",
usid: this.usid,
requestTime: new Date().getTime(),
})
}
})
this.registerHTTPEndpoint({
method: "GET",
route: "/__http_map",
fn: (req, res) => {
return res.json({
endpointsMap: this.endpoints_map,
wsEndpointsMap: this.websocket_instance.map,
})
}
})
@ -315,13 +326,11 @@ class Server {
cleanupProcess = () => {
this.InternalConsole.log("🛑 Stopping server...")
if (typeof this.engine_instance.close === "function") {
this.engine_instance.close()
if (typeof this.engine.app.close === "function") {
this.engine.app.close()
}
this.websocket_instance.io.close()
process.exit(1)
this.engine.io.close()
}
// handlers
@ -355,52 +364,12 @@ class Server {
}
}
handleWSClientConnection = async (client) => {
client.res = (...args) => {
client.emit("response", ...args)
}
client.err = (...args) => {
client.emit("responseError", ...args)
}
if (typeof this.params.onWSClientConnection === "function") {
await this.params.onWSClientConnection(client)
}
for await (const [nsp, on, dispatch] of this.websocket_instance.eventsChannels) {
client.on(on, async (...args) => {
try {
await dispatch(client, ...args).catch((error) => {
client.err({
message: error.message,
})
})
} catch (error) {
client.err({
message: error.message,
})
}
})
}
client.on("ping", () => {
client.emit("pong")
})
client.on("disconnect", async () => {
if (typeof this.params.onWSClientDisconnect === "function") {
await this.params.onWSClientDisconnect(client)
}
})
}
// public methods
outputServerInfo = () => {
this.InternalConsole.table({
"linebridge_version": LINEBRIDGE_SERVER_VERSION,
"engine": this.params.engine,
"http_address": this.params.http_address,
"websocket_address": this.params.ws_address,
"address": this.params.http_address,
"listen_port": this.params.listen_port,
})
}

View File

@ -0,0 +1,3 @@
const { webcrypto: crypto } = require("crypto")
export default (t = 21) => crypto.getRandomValues(new Uint8Array(t)).reduce(((t, e) => t += (e &= 63) < 36 ? e.toString(36) : e < 62 ? (e - 26).toString(36).toUpperCase() : e > 62 ? "-" : "_"), "")