From 8ac78cbf5f67ec45632c216ab2ddaa6bd58343a0 Mon Sep 17 00:00:00 2001 From: SrGooglo Date: Tue, 25 Mar 2025 23:04:38 +0000 Subject: [PATCH] support for rtengineng --- packages/server/classes/RTEngineNG/index.js | 152 ++++++++ .../server/classes/TaskQueueManager/index.js | 10 +- packages/server/gateway/index.js | 12 +- packages/server/gateway/proxy.js | 326 +++++++++--------- packages/server/lib/injectedAuth/index.js | 2 +- packages/server/package.json | 12 +- .../marketplace/routes/extensions/list/get.js | 7 + .../marketplace/utils/b2Upload/index.js | 86 +++++ .../notifications/notifications.service.js | 36 +- .../posts/classes/posts/methods/create.js | 10 +- .../posts/classes/posts/methods/delete.js | 20 +- .../posts/classes/posts/methods/toggleLike.js | 95 +++-- .../posts/classes/posts/methods/update.js | 18 +- .../posts/classes/posts/methods/votePoll.js | 79 +++-- .../server/services/posts/posts.service.js | 63 ++-- .../posts/routes_ws/connect_realtime.js | 4 - .../posts/routes_ws/disconnect_realtime.js | 4 - .../server/utils/flatRouteredFunctions.js | 23 ++ packages/server/utils/getRouteredFunctions.js | 24 ++ 19 files changed, 636 insertions(+), 347 deletions(-) create mode 100644 packages/server/classes/RTEngineNG/index.js create mode 100644 packages/server/services/marketplace/routes/extensions/list/get.js create mode 100644 packages/server/services/marketplace/utils/b2Upload/index.js delete mode 100644 packages/server/services/posts/routes_ws/connect_realtime.js delete mode 100644 packages/server/services/posts/routes_ws/disconnect_realtime.js create mode 100644 packages/server/utils/flatRouteredFunctions.js create mode 100644 packages/server/utils/getRouteredFunctions.js diff --git a/packages/server/classes/RTEngineNG/index.js b/packages/server/classes/RTEngineNG/index.js new file mode 100644 index 00000000..d016d3b6 --- /dev/null +++ b/packages/server/classes/RTEngineNG/index.js @@ -0,0 +1,152 @@ +import HyperExpress from "hyper-express" + +class RTEngineNG { + constructor(config = {}) { + this.events = new Map() + + if (typeof config.events === "object") { + for (const [event, handler] of Object.entries(config.events)) { + this.events.set(event, handler) + } + } + + this.onUpgrade = config.onUpgrade || null + this.onConnection = config.onConnection || null + this.onDisconnection = config.onDisconnection || null + } + + clients = new Set() + + router = new HyperExpress.Router() + + senders = { + broadcast: async (event, data) => { + for (const client of this.clients) { + this.sendMessage(client, event, data) + } + }, + } + + sendMessage = (socket, event, data) => { + const payload = JSON.stringify({ event, data }) + + socket.send(payload) + } + + sendToTopic = (socket, topic, event, data, self = false) => { + const payload = JSON.stringify({ + topic, + event, + data, + }) + + socket.publish(topic, payload) + + if (self === true) { + this.sendMessage(socket, event, data) + } + } + + sendError = (socket, error) => { + if (error instanceof Error) { + error = error.toString() + } + + this.sendMessage(socket, "error", error) + } + + handleMessage = async (socket, payload) => { + try { + const message = JSON.parse(payload) + + if (typeof message.event !== "string") { + return this.sendError(socket, "Invalid event type") + } + + const handler = this.events.get(message.event) + + if (typeof handler === "function") { + const handlerSenders = { + ...this.senders, + toTopic: (room, event, data, self) => { + this.sendToTopic(socket, room, event, data, self) + }, + send: (event, data) => { + this.sendMessage(socket, event, data) + }, + error: (error) => { + this.sendError(socket, error) + }, + } + + await handler(socket, message.data, handlerSenders) + } else { + this.sendError(socket, "Event handler not found") + } + } catch (error) { + this.sendError(socket, error) + } + } + + handleConnection = async (socket) => { + socket.on("close", () => this.handleDisconnection(socket)) + socket.on("message", (payload) => this.handleMessage(socket, payload)) + + if (this.onConnection) { + await this.onConnection(socket) + } + + this.clients.add(socket) + } + + handleDisconnection = async (socket) => { + if (this.onDisconnection) { + await this.onDisconnection(socket) + } + + this.clients.delete(socket) + } + + handleUpgrade = async (req, res) => { + try { + const context = { + id: nanoid(), + token: req.query.token, + user: null, + httpHeaders: req.headers, + } + + if (typeof this.onUpgrade === "function") { + res.upgrade(await this.onUpgrade(context, req.query.token)) + } else { + res.upgrade(context) + } + } catch (error) { + console.error("Error upgrading connection:", error) + res.status(401).end() + } + } + + registerEvent = (event, handler) => { + this.events.set(event, handler) + } + + registerEvents = (obj) => { + for (const [event, handler] of Object.entries(obj)) { + this.registerEvent(event, handler) + } + } + + initialize = async (engine) => { + this.engine = engine + + this.router.ws("/", this.handleConnection) + this.router.upgrade("/", this.handleUpgrade) + + this.engine.app.use("/", this.router) + + console.log(`✅ RTEngineNG initialized with ${this.events.size} events`) + } +} + +export default RTEngineNG diff --git a/packages/server/classes/TaskQueueManager/index.js b/packages/server/classes/TaskQueueManager/index.js index ad9f6141..0fbf8a69 100644 --- a/packages/server/classes/TaskQueueManager/index.js +++ b/packages/server/classes/TaskQueueManager/index.js @@ -3,13 +3,11 @@ import { Queue, Worker } from "bullmq" import { composeURL as composeRedisConnectionString } from "@shared-classes/RedisClient" export default class TaskQueueManager { - constructor(params, ctx) { + constructor(params) { if (!params) { throw new Error("Missing params") } - this.ctx = ctx - this.params = params this.queues = {} this.workers = {} @@ -36,17 +34,15 @@ export default class TaskQueueManager { } registerQueue = (queueObj, options) => { - const connection = this.ctx.engine.ws.redis - const queue = new Queue(queueObj.id, { - connection, + connection: options.redisOptions, defaultJobOptions: { removeOnComplete: true, }, }) const worker = new Worker(queueObj.id, queueObj.process, { - connection, + connection: options.redisOptions, concurrency: queueObj.maxJobs ?? 1, }) diff --git a/packages/server/gateway/index.js b/packages/server/gateway/index.js index dfbde092..8599cc8a 100644 --- a/packages/server/gateway/index.js +++ b/packages/server/gateway/index.js @@ -187,10 +187,20 @@ export default class Gateway { } if (msg.type === "router:ws:register") { + let target = `http://${this.state.internalIp}:${msg.data.listen_port ?? msg.data.listen?.port}` + + if (!msg.data.ws_path && msg.data.namespace) { + target += `/${msg.data.namespace}` + } + + if (msg.data.ws_path && msg.data.ws_path !== "/") { + target += `/${msg.data.ws_path}` + } + await this.proxy.register({ serviceId: id, path: `/${msg.data.namespace}`, - target: `http://${this.state.internalIp}:${msg.data.listen.port}/${msg.data.namespace}`, + target: target, pathRewrite: { [`^/${msg.data.namespace}`]: "", }, diff --git a/packages/server/gateway/proxy.js b/packages/server/gateway/proxy.js index a5ab5033..eeaad1e2 100644 --- a/packages/server/gateway/proxy.js +++ b/packages/server/gateway/proxy.js @@ -9,204 +9,214 @@ import https from "node:https" import fs from "node:fs" import path from "node:path" -function getHttpServerEngine(extraOptions = {}, handler = () => { }) { - const sslKey = path.resolve(process.cwd(), ".ssl", "privkey.pem") - const sslCert = path.resolve(process.cwd(), ".ssl", "cert.pem") +function getHttpServerEngine(extraOptions = {}, handler = () => {}) { + const sslKey = path.resolve(process.cwd(), ".ssl", "privkey.pem") + const sslCert = path.resolve(process.cwd(), ".ssl", "cert.pem") - if (fs.existsSync(sslKey) && fs.existsSync(sslCert)) { - return https.createServer( - { - key: fs.readFileSync(sslKey), - cert: fs.readFileSync(sslCert), - ...extraOptions - }, - handler - ) - } else { - return http.createServer(extraOptions, handler) - } + if (fs.existsSync(sslKey) && fs.existsSync(sslCert)) { + return https.createServer( + { + key: fs.readFileSync(sslKey), + cert: fs.readFileSync(sslCert), + ...extraOptions, + }, + handler, + ) + } else { + return http.createServer(extraOptions, handler) + } } export default class Proxy { - constructor() { - this.proxys = new Map() - this.wsProxys = new Map() + constructor() { + this.proxys = new Map() + this.wsProxys = new Map() - this.http = getHttpServerEngine({}, this.handleHttpRequest) - this.http.on("upgrade", this.handleHttpUpgrade) - } + this.http = getHttpServerEngine({}, this.handleHttpRequest) + this.http.on("upgrade", this.handleHttpUpgrade) + } - http = null + http = null - register = ({ serviceId, path, target, pathRewrite, ws } = {}) => { - if (!path) { - throw new Error("Path is required") - } + register = ({ serviceId, path, target, pathRewrite, ws } = {}) => { + if (!path) { + throw new Error("Path is required") + } - if (!target) { - throw new Error("Target is required") - } + if (!target) { + throw new Error("Target is required") + } - if (this.proxys.has(path)) { - console.warn(`Proxy already registered [${path}], skipping...`) - return false - } + if (this.proxys.has(path)) { + console.warn(`Proxy already registered [${path}], skipping...`) + return false + } - const proxy = httpProxy.createProxyServer({ - target: target, - }) + const proxy = httpProxy.createProxyServer({ + target: target, + }) - proxy.on("proxyReq", (proxyReq, req, res, options) => { - proxyReq.setHeader("x-linebridge-version", pkg.version) - proxyReq.setHeader("x-forwarded-for", req.socket.remoteAddress) - }) + proxy.on("proxyReq", (proxyReq, req, res, options) => { + proxyReq.setHeader("x-linebridge-version", pkg.version) + proxyReq.setHeader("x-forwarded-for", req.socket.remoteAddress) + }) - proxy.on("error", (e) => { - console.error(e) - }) + proxy.on("error", (e) => { + console.error(e) + }) - const proxyObj = { - serviceId: serviceId ?? "default_service", - path: path, - target: target, - pathRewrite: pathRewrite, - proxy: proxy, - } + const proxyObj = { + serviceId: serviceId ?? "default_service", + path: path, + target: target, + pathRewrite: pathRewrite, + proxy: proxy, + } - if (ws) { - console.log(`🔗 Registering websocket proxy [${path}] -> [${target}]`) - this.wsProxys.set(path, proxyObj) - } else { - console.log(`🔗 Registering path proxy [${path}] -> [${target}]`) - this.proxys.set(path, proxyObj) - } + if (ws) { + console.log( + `🔗 Registering websocket proxy [${path}] -> [${target}]`, + ) + this.wsProxys.set(path, proxyObj) + } else { + console.log(`🔗 Registering path proxy [${path}] -> [${target}]`) + this.proxys.set(path, proxyObj) + } - return true - } + return true + } - unregister = (path) => { - if (!this.proxys.has(path)) { - console.warn(`Proxy not registered [${path}], skipping...`) - return false - } + unregister = (path) => { + if (!this.proxys.has(path)) { + console.warn(`Proxy not registered [${path}], skipping...`) + return false + } - console.log(`🔗 Unregistering path proxy [${path}]`) + console.log(`🔗 Unregistering path proxy [${path}]`) - this.proxys.get(path).proxy.close() - this.proxys.delete(path) - } + this.proxys.get(path).proxy.close() + this.proxys.delete(path) + } - unregisterAllFromService = (serviceId) => { - this.proxys.forEach((value, key) => { - if (value.serviceId === serviceId) { - this.unregister(value.path) - } - }) - } + unregisterAllFromService = (serviceId) => { + this.proxys.forEach((value, key) => { + if (value.serviceId === serviceId) { + this.unregister(value.path) + } + }) + } - listen = async (port = 9000, host = "0.0.0.0", cb) => { - return await new Promise((resolve, reject) => { - this.http.listen(port, host, () => { - console.log(`🔗 Proxy listening on ${host}:${port}`) + listen = async (port = 9000, host = "0.0.0.0", cb) => { + return await new Promise((resolve, reject) => { + this.http.listen(port, host, () => { + console.log(`🔗 Proxy listening on ${host}:${port}`) - if (cb) { - cb(this) - } + if (cb) { + cb(this) + } - resolve(this) - }) - }) - } + resolve(this) + }) + }) + } - rewritePath = (rewriteConfig, path) => { - let result = path - const rules = [] + rewritePath = (rewriteConfig, path) => { + let result = path + const rules = [] - for (const [key, value] of Object.entries(rewriteConfig)) { - rules.push({ - regex: new RegExp(key), - value: value, - }) - } + for (const [key, value] of Object.entries(rewriteConfig)) { + rules.push({ + regex: new RegExp(key), + value: value, + }) + } - for (const rule of rules) { - if (rule.regex.test(path)) { - result = result.replace(rule.regex, rule.value) - break - } - } + for (const rule of rules) { + if (rule.regex.test(path)) { + result = result.replace(rule.regex, rule.value) + break + } + } - return result - } + return result + } - setCorsHeaders = (res) => { - res.setHeader("Access-Control-Allow-Origin", "*") - res.setHeader("Access-Control-Allow-Methods", "GET,HEAD,PUT,PATCH,POST,DELETE") - res.setHeader("Access-Control-Allow-Headers", "*") + setCorsHeaders = (res) => { + res.setHeader("Access-Control-Allow-Origin", "*") + res.setHeader( + "Access-Control-Allow-Methods", + "GET,HEAD,PUT,PATCH,POST,DELETE", + ) + res.setHeader("Access-Control-Allow-Headers", "*") - return res - } + return res + } - handleHttpRequest = (req, res) => { - res = this.setCorsHeaders(res) + handleHttpRequest = (req, res) => { + res = this.setCorsHeaders(res) - const sanitizedUrl = req.url.split("?")[0] + const sanitizedUrl = req.url.split("?")[0] - // preflight continue with code 204 - if (req.method === "OPTIONS") { - res.statusCode = 204 - res.end() - return - } + // preflight continue with code 204 + if (req.method === "OPTIONS") { + res.statusCode = 204 + res.end() + return + } - if (sanitizedUrl === "/") { - return res.end(JSON.stringify({ - name: pkg.name, - version: pkg.version, - lb_version: defaults.version - })) - } + if (sanitizedUrl === "/") { + return res.end( + JSON.stringify({ + name: pkg.name, + version: pkg.version, + lb_version: defaults.version, + }), + ) + } - const namespace = `/${sanitizedUrl.split("/")[1]}` - const route = this.proxys.get(namespace) + const namespace = `/${sanitizedUrl.split("/")[1]}` + const route = this.proxys.get(namespace) - if (!route) { - res.statusCode = 404 + if (!route) { + res.statusCode = 404 - res.end(JSON.stringify({ - error: "Gateway route not found", - details: "The gateway route you are trying to access does not exist, maybe the service is down...", - namespace: namespace, - })) + res.end( + JSON.stringify({ + error: "Gateway route not found", + details: + "The gateway route you are trying to access does not exist, maybe the service is down...", + namespace: namespace, + }), + ) - return null - } + return null + } - if (route.pathRewrite) { - req.url = this.rewritePath(route.pathRewrite, req.url) - } + if (route.pathRewrite) { + req.url = this.rewritePath(route.pathRewrite, req.url) + } - route.proxy.web(req, res) - } + route.proxy.web(req, res) + } - handleHttpUpgrade = (req, socket, head) => { - const namespace = `/${req.url.split("/")[1]}` - const route = this.wsProxys.get(namespace) + handleHttpUpgrade = (req, socket, head) => { + const namespace = `/${req.url.split("/")[1].split("?")[0]}` + const route = this.wsProxys.get(namespace) - if (!route) { - // destroy socket - socket.destroy() - return false - } + if (!route) { + // destroy socket + socket.destroy() + return false + } - if (route.pathRewrite) { - req.url = this.rewritePath(route.pathRewrite, req.url) - } + if (route.pathRewrite) { + req.url = this.rewritePath(route.pathRewrite, req.url) + } - route.proxy.ws(req, socket, head) - } + route.proxy.ws(req, socket, head) + } - close = () => { - this.http.close() - } -} \ No newline at end of file + close = () => { + this.http.close() + } +} diff --git a/packages/server/lib/injectedAuth/index.js b/packages/server/lib/injectedAuth/index.js index 6e17bd9b..267b2e33 100644 --- a/packages/server/lib/injectedAuth/index.js +++ b/packages/server/lib/injectedAuth/index.js @@ -20,10 +20,10 @@ export default async (obj, token) => { userData._id = userData._id.toString() + // inject to obj obj.user = userData obj.token = token obj.session = validation.data - obj.user = userData return obj } diff --git a/packages/server/package.json b/packages/server/package.json index 87cc78c3..e38a842b 100755 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -1,6 +1,6 @@ { "name": "@comty/server", - "version": "1.27.3@alpha", + "version": "1.28.0@alpha", "license": "ComtyLicense", "private": true, "workspaces": [ @@ -12,7 +12,6 @@ "build:bin": "cd build && pkg ./index.js" }, "dependencies": { - "@gullerya/object-observer": "^6.1.3", "@infisical/sdk": "^2.1.8", "@opentelemetry/api": "^1.9.0", "@opentelemetry/auto-instrumentations-node": "^0.56.1", @@ -20,16 +19,11 @@ "@sentry/node": "^9.4.0", "axios": "^1.7.4", "bcrypt": "^5.1.1", - "bull": "^4.16.5", "bullmq": "^5.41.5", - "chalk": "4.1.2", - "comty.js": "^0.60.3", - "dotenv": "^16.4.4", + "comty.js": "^0.61.0", "http-proxy": "^1.18.1", - "hyper-express": "^6.17.2", - "ioredis": "^5.4.1", "jsonwebtoken": "^9.0.2", - "linebridge": "^0.22.8", + "linebridge": "^0.24.0", "minio": "^8.0.1", "module-alias": "^2.2.3", "mongoose": "^8.5.3", diff --git a/packages/server/services/marketplace/routes/extensions/list/get.js b/packages/server/services/marketplace/routes/extensions/list/get.js new file mode 100644 index 00000000..679325da --- /dev/null +++ b/packages/server/services/marketplace/routes/extensions/list/get.js @@ -0,0 +1,7 @@ +import { Extension } from "@db_models" + +export default async (req) => { + const extensions = await Extension.find() + + return extensions +} diff --git a/packages/server/services/marketplace/utils/b2Upload/index.js b/packages/server/services/marketplace/utils/b2Upload/index.js new file mode 100644 index 00000000..82f80239 --- /dev/null +++ b/packages/server/services/marketplace/utils/b2Upload/index.js @@ -0,0 +1,86 @@ +import fs from "node:fs" +import path from "node:path" +import pMap from "p-map" + +export default async function b2Upload({ + source, + remotePath, + metadata = {}, + targetFilename, + isDirectory, + retryNumber = 0, +}) { + if (isDirectory) { + let files = await fs.promises.readdir(source) + + files = files.map((file) => { + const filePath = path.join(source, file) + + const isTargetDirectory = fs.lstatSync(filePath).isDirectory() + + return { + source: filePath, + remotePath: path.join(remotePath, file), + isDirectory: isTargetDirectory, + } + }) + + await pMap(files, b2Upload, { + concurrency: 5, + }) + + return { + id: remotePath, + url: `https://${process.env.B2_CDN_ENDPOINT}/${process.env.B2_BUCKET}/${remotePath}/${targetFilename}`, + metadata: metadata, + } + } + + try { + //await global.b2.authorize() + + if (!fs.existsSync(source)) { + throw new OperationError(500, "File not found") + } + + const uploadUrl = await global.b2.getUploadUrl({ + bucketId: process.env.B2_BUCKET_ID, + }) + + console.debug(`Uploading object to B2 Storage >`, { + source: source, + remote: remotePath, + }) + + const data = await fs.promises.readFile(source) + + await global.b2.uploadFile({ + uploadUrl: uploadUrl.data.uploadUrl, + uploadAuthToken: uploadUrl.data.authorizationToken, + fileName: remotePath, + data: data, + info: metadata, + }) + } catch (error) { + console.error(error) + + if (retryNumber < 5) { + return await b2Upload({ + source, + remotePath, + metadata, + targetFilename, + isDirectory, + retryNumber: retryNumber + 1, + }) + } + + throw new OperationError(500, "B2 upload failed") + } + + return { + id: remotePath, + url: `https://${process.env.B2_CDN_ENDPOINT}/${process.env.B2_BUCKET}/${remotePath}`, + metadata: metadata, + } +} diff --git a/packages/server/services/notifications/notifications.service.js b/packages/server/services/notifications/notifications.service.js index 42dbb10e..5cf2dbc3 100644 --- a/packages/server/services/notifications/notifications.service.js +++ b/packages/server/services/notifications/notifications.service.js @@ -6,27 +6,27 @@ import RedisClient from "@shared-classes/RedisClient" import SharedMiddlewares from "@shared-middlewares" class API extends Server { - static refName = "notifications" - static enableWebsockets = true - static wsRoutesPath = `${__dirname}/ws_routes` - static routesPath = `${__dirname}/routes` - static listen_port = process.env.HTTP_LISTEN_PORT ?? 3009 + static refName = "notifications" + static enableWebsockets = true + static wsRoutesPath = `${__dirname}/ws_routes` + static routesPath = `${__dirname}/routes` + static listen_port = process.env.HTTP_LISTEN_PORT ?? 3009 - middlewares = { - ...SharedMiddlewares - } + middlewares = { + ...SharedMiddlewares, + } - contexts = { - db: new DbManager(), - redis: RedisClient(), - } + contexts = { + db: new DbManager(), + redis: RedisClient(), + } - async onInitialize() { - await this.contexts.db.initialize() - await this.contexts.redis.initialize() - } + async onInitialize() { + await this.contexts.db.initialize() + await this.contexts.redis.initialize() + } - handleWsAuth = require("@shared-lib/handleWsAuth").default + handleWsAuth = require("@shared-lib/handleWsAuth").default } -Boot(API) \ No newline at end of file +Boot(API) diff --git a/packages/server/services/posts/classes/posts/methods/create.js b/packages/server/services/posts/classes/posts/methods/create.js index 7083634f..0b91c36f 100644 --- a/packages/server/services/posts/classes/posts/methods/create.js +++ b/packages/server/services/posts/classes/posts/methods/create.js @@ -89,18 +89,16 @@ export default async (payload = {}, req) => { // broadcast post to all users if (visibility === "public") { - global.websocket.io - .to("global:posts:realtime") - .emit(`post.new`, result[0]) + global.websocket.senders.toTopic("realtime:feed", "post:new", result[0]) } if (visibility === "private") { - const userSocket = await global.websocket.find.socketByUserId( + const userSockets = await global.websocket.find.clientsByUserId( post.user_id, ) - if (userSocket) { - userSocket.emit(`post.new`, result[0]) + for (const userSocket of userSockets) { + userSocket.emit(`post:new`, result[0]) } } diff --git a/packages/server/services/posts/classes/posts/methods/delete.js b/packages/server/services/posts/classes/posts/methods/delete.js index c64a9920..4895be6f 100644 --- a/packages/server/services/posts/classes/posts/methods/delete.js +++ b/packages/server/services/posts/classes/posts/methods/delete.js @@ -40,22 +40,22 @@ export default async (payload = {}) => { throw new OperationError(500, `An error has occurred: ${err.message}`) }) + // broadcast post to all users if (post.visibility === "public") { - global.websocket.io - .to("global:posts:realtime") - .emit(`post.delete`, post) - global.websocket.io - .to("global:posts:realtime") - .emit(`post.delete.${post_id}`, post) + global.websocket.senders.toTopic( + "realtime:feed", + "post:delete", + post_id, + ) } if (post.visibility === "private") { - const userSocket = await global.websocket.find.socketByUserId( + const userSockets = await global.websocket.find.clientsByUserId( post.user_id, ) - if (userSocket) { - userSocket.emit(`post.delete`, post_id) - userSocket.emit(`post.delete.${post_id}`, post_id) + + for (const userSocket of userSockets) { + userSocket.emit(`post:delete`, post_id) } } diff --git a/packages/server/services/posts/classes/posts/methods/toggleLike.js b/packages/server/services/posts/classes/posts/methods/toggleLike.js index 2981a93a..14031971 100644 --- a/packages/server/services/posts/classes/posts/methods/toggleLike.js +++ b/packages/server/services/posts/classes/posts/methods/toggleLike.js @@ -1,62 +1,59 @@ import { Post, PostLike } from "@db_models" export default async (payload = {}) => { - let { post_id, user_id, to } = payload + let { post_id, user_id, to } = payload - if (!post_id || !user_id) { - throw new OperationError(400, "Missing post_id or user_id") - } + if (!post_id || !user_id) { + throw new OperationError(400, "Missing post_id or user_id") + } - // check if post exist - let existPost = await Post.findOne({ - _id: post_id, - }) + // check if post exist + let existPost = await Post.findOne({ + _id: post_id, + }) - if (!existPost) { - throw new OperationError(404, "Post not found") - } + if (!existPost) { + throw new OperationError(404, "Post not found") + } - let likeObj = await PostLike.findOne({ - user_id, - post_id, - }) + let likeObj = await PostLike.findOne({ + user_id, + post_id, + }) - if (typeof to === "undefined") { - if (likeObj) { - to = false - } else { - to = true - } - } + if (typeof to === "undefined") { + if (likeObj) { + to = false + } else { + to = true + } + } - if (to) { - likeObj = new PostLike({ - post_id, - user_id, - }) + if (to) { + likeObj = new PostLike({ + post_id, + user_id, + }) - await likeObj.save() - } else { - await PostLike.findByIdAndDelete(likeObj._id) - } + await likeObj.save() + } else { + await PostLike.findByIdAndDelete(likeObj._id) + } - const count = await PostLike.countDocuments({ - post_id, - }) + const count = await PostLike.countDocuments({ + post_id, + }) - const eventData = { - to, - post_id, - user_id, - count: count, - } + const eventData = { + to, + post_id, + user_id, + count: count, + } - global.websocket.io.of("/").emit(`post.${post_id}.likes.update`, eventData) - global.websocket.io.of("/").emit(`post.like.update`, eventData) - - return { - post_id: post_id, - liked: to, - count: count - } -} \ No newline at end of file + return { + post_id: post_id, + liked: to, + count: count, + } +} diff --git a/packages/server/services/posts/classes/posts/methods/update.js b/packages/server/services/posts/classes/posts/methods/update.js index 9db0334d..26e3b891 100644 --- a/packages/server/services/posts/classes/posts/methods/update.js +++ b/packages/server/services/posts/classes/posts/methods/update.js @@ -37,22 +37,20 @@ export default async (post_id, update) => { }) if (post.visibility === "public") { - global.websocket.io - .to("global:posts:realtime") - .emit(`post.update`, result[0]) - global.websocket.io - .to("global:posts:realtime") - .emit(`post.update.${post_id}`, result[0]) + global.websocket.senders.toTopic( + "realtime:feed", + `post:update`, + result[0], + ) } if (post.visibility === "private") { - const userSocket = await global.websocket.find.socketByUserId( + const userSockets = await global.websocket.find.clientsByUserId( post.user_id, ) - if (userSocket) { - userSocket.emit(`post.update`, result[0]) - userSocket.emit(`post.update.${post_id}`, result[0]) + for (const userSocket of userSockets) { + userSocket.emit(`post:update`, result[0]) } } diff --git a/packages/server/services/posts/classes/posts/methods/votePoll.js b/packages/server/services/posts/classes/posts/methods/votePoll.js index 181e0fd6..61290fcc 100644 --- a/packages/server/services/posts/classes/posts/methods/votePoll.js +++ b/packages/server/services/posts/classes/posts/methods/votePoll.js @@ -1,46 +1,61 @@ -import { VotePoll } from "@db_models" +import { VotePoll, Post } from "@db_models" +import stage from "./stage" +// TODO: Implement logic to handle vote poll export default async (payload = {}) => { - if (!payload.user_id) { - throw new OperationError(400, "Missing user_id") - } + if (!payload.user_id) { + throw new OperationError(400, "Missing user_id") + } - if (!payload.post_id) { - throw new OperationError(400, "Missing post_id") - } + if (!payload.post_id) { + throw new OperationError(400, "Missing post_id") + } - if (!payload.option_id) { - throw new OperationError(400, "Missing option_id") - } + if (!payload.option_id) { + throw new OperationError(400, "Missing option_id") + } - let vote = await VotePoll.findOne({ - user_id: payload.user_id, - post_id: payload.post_id, - }) + let post = await Post.findOne({ + _id: payload.post_id, + }) - let previousOptionId = null + if (!post) { + throw new OperationError(404, "Post not found") + } - if (vote) { - previousOptionId = vote.option_id + let vote = await VotePoll.findOne({ + user_id: payload.user_id, + post_id: payload.post_id, + }) - await VotePoll.deleteOne({ - _id: vote._id.toString() - }) - } + let previousOptionId = null - vote = new VotePoll({ - user_id: payload.user_id, - post_id: payload.post_id, - option_id: payload.option_id, - }) + if (vote) { + previousOptionId = vote.option_id - await vote.save() + await VotePoll.deleteOne({ + _id: vote._id.toString(), + }) + } - vote = vote.toObject() + vote = new VotePoll({ + user_id: payload.user_id, + post_id: payload.post_id, + option_id: payload.option_id, + }) - vote.previous_option_id = previousOptionId + await vote.save() - global.websocket.io.of("/").emit(`post.poll.vote`, vote) + vote = vote.toObject() - return vote -} \ No newline at end of file + post = (await stage({ posts: post, for_user_id: payload.user_id }))[0] + + if (post.visibility === "public") { + global.websocket.senders.toTopic("realtime:feed", `post:update`, post) + } + + return { + post: post, + vote: vote, + } +} diff --git a/packages/server/services/posts/posts.service.js b/packages/server/services/posts/posts.service.js index 7ddda673..387bf0d3 100644 --- a/packages/server/services/posts/posts.service.js +++ b/packages/server/services/posts/posts.service.js @@ -3,39 +3,14 @@ import { Server } from "linebridge" import DbManager from "@shared-classes/DbManager" import RedisClient from "@shared-classes/RedisClient" import TaskQueueManager from "@shared-classes/TaskQueueManager" +import InjectedAuth from "@shared-lib/injectedAuth" import SharedMiddlewares from "@shared-middlewares" -// wsfast -import HyperExpress from "hyper-express" - -class WSFastServer { - router = new HyperExpress.Router() - - clients = new Set() - - routes = { - connect: async (socket) => { - console.log("Client connected", socket) - }, - } - - async initialize(engine) { - this.engine = engine - - Object.keys(this.routes).forEach((route) => { - this.router.ws(`/${route}`, this.routes[route]) - }) - - this.engine.app.use(this.router) - } -} - export default class API extends Server { static refName = "posts" + static useEngine = "hyper-express-ng" static enableWebsockets = true - static routesPath = `${__dirname}/routes` - static wsRoutesPath = `${__dirname}/routes_ws` static listen_port = process.env.HTTP_LISTEN_PORT ?? 3001 @@ -43,31 +18,43 @@ export default class API extends Server { ...SharedMiddlewares, } + handleWsUpgrade = async (context, token, res) => { + context = await InjectedAuth(context, token, res) + + if (!context.user) { + res.close(401, "Unauthorized or missing auth token") + return false + } + + return res.upgrade(context) + } + + handleWsConnection = (socket) => { + console.log(`[WS] @${socket.context.user.username} connected`) + } + + handleWsDisconnect = (socket) => { + console.log(`[WS] @${socket.context.user.username} disconnected`) + } + contexts = { db: new DbManager(), redis: RedisClient(), - ws: new WSFastServer(this.engine), } - queuesManager = new TaskQueueManager( - { - workersPath: `${__dirname}/queues`, - }, - this, - ) + queuesManager = new TaskQueueManager({ + workersPath: `${__dirname}/queues`, + }) async onInitialize() { await this.contexts.db.initialize() await this.contexts.redis.initialize() await this.queuesManager.initialize({ - redisOptions: this.engine.ws.redis.options, + redisOptions: this.contexts.redis.client.options, }) - await this.contexts.ws.initialize(this.engine) global.queues = this.queuesManager } - - handleWsAuth = require("@shared-lib/handleWsAuth").default } Boot(API) diff --git a/packages/server/services/posts/routes_ws/connect_realtime.js b/packages/server/services/posts/routes_ws/connect_realtime.js deleted file mode 100644 index f06abf0a..00000000 --- a/packages/server/services/posts/routes_ws/connect_realtime.js +++ /dev/null @@ -1,4 +0,0 @@ -export default async function (socket) { - console.log(`Socket ${socket.id} connected to realtime posts`) - socket.join("global:posts:realtime") -} diff --git a/packages/server/services/posts/routes_ws/disconnect_realtime.js b/packages/server/services/posts/routes_ws/disconnect_realtime.js deleted file mode 100644 index 90372242..00000000 --- a/packages/server/services/posts/routes_ws/disconnect_realtime.js +++ /dev/null @@ -1,4 +0,0 @@ -export default async function (socket) { - console.log(`Socket ${socket.id} disconnected from realtime posts`) - socket.leave("global:posts:realtime") -} diff --git a/packages/server/utils/flatRouteredFunctions.js b/packages/server/utils/flatRouteredFunctions.js new file mode 100644 index 00000000..80f55fb3 --- /dev/null +++ b/packages/server/utils/flatRouteredFunctions.js @@ -0,0 +1,23 @@ +// convert routered functions to flat routes, +// eg: { fn:1, nestedfn: { test: 2, test2: 3}} -> { fn:1, nestedfn:test: 2, nestedfn:test2: 3} + +export default function flatRouteredFunctions(obj, prefix = "", acc = {}) { + for (const key in obj) { + if (Object.prototype.hasOwnProperty.call(obj, key)) { + const value = obj[key] + // Determine the new key: if there's a prefix, add it with a colon separator. + const newKey = prefix ? `${prefix}:${key}` : key + // If value is a non-null object (and not an array), recursively flatten it. + if ( + value !== null && + typeof value === "object" && + !Array.isArray(value) + ) { + flatRouteredFunctions(value, newKey, acc) + } else { + acc[newKey] = value + } + } + } + return acc +} diff --git a/packages/server/utils/getRouteredFunctions.js b/packages/server/utils/getRouteredFunctions.js new file mode 100644 index 00000000..2d932046 --- /dev/null +++ b/packages/server/utils/getRouteredFunctions.js @@ -0,0 +1,24 @@ +import fs from "node:fs/promises" +import path from "node:path" + +export default async function getRouteredFunctions(dir) { + const files = await fs.readdir(dir) + + const result = {} + + for (const file of files) { + const filePath = path.join(dir, file) + const stat = await fs.stat(filePath) + + const eventName = path.basename(file).split(".")[0] + + if (stat.isFile()) { + const event = await import(filePath) + result[eventName] = event.default + } else if (stat.isDirectory()) { + result[eventName] = await getRouteredFunctions(filePath) + } + } + + return result +}