From bdbecb2857b321a78960039f9558875911aa5d81 Mon Sep 17 00:00:00 2001 From: SrGooglo Date: Fri, 28 Mar 2025 18:30:41 +0000 Subject: [PATCH] improve gateway --- packages/server/gateway/index.js | 559 ++++++++------------ packages/server/gateway/services/manager.js | 149 ++++++ packages/server/gateway/services/service.js | 190 +++++++ 3 files changed, 553 insertions(+), 345 deletions(-) create mode 100644 packages/server/gateway/services/manager.js create mode 100644 packages/server/gateway/services/service.js diff --git a/packages/server/gateway/index.js b/packages/server/gateway/index.js index 8599cc8a..fe559a01 100644 --- a/packages/server/gateway/index.js +++ b/packages/server/gateway/index.js @@ -1,12 +1,9 @@ require("dotenv").config() import path from "node:path" -import Spinnies from "spinnies" -import { Observable } from "@gullerya/object-observer" -import { dots as DefaultSpinner } from "spinnies/spinners.json" import EventEmitter from "@foxify/events" +import { Observable } from "@gullerya/object-observer" import IPCRouter from "linebridge/dist/classes/IPCRouter" -import chokidar from "chokidar" import { onExit } from "signal-exit" import chalk from "chalk" import treeKill from "tree-kill" @@ -14,18 +11,21 @@ import treeKill from "tree-kill" import getIgnoredFiles from "./utils/getIgnoredFiles" import scanServices from "./utils/scanServices" import spawnService from "./utils/spawnService" - import Proxy from "./proxy" import RELP from "./repl" - import comtyAscii from "./ascii" import pkg from "../package.json" -const useLoadSpinner = process.argv.includes("--load-spinner") +import ServiceManager from "./services/manager" +import Service from "./services/service" + const isProduction = process.env.NODE_ENV === "production" +/** + * Gateway class - Main entry point for the service orchestrator + * Manages service discovery, spawning, and communication + */ export default class Gateway { - spinnies = new Spinnies() eventBus = new EventEmitter() state = { @@ -34,273 +34,224 @@ export default class Gateway { allReady: false, } - selectedProcessInstance = null - - instancePool = [] + selectedService = null + serviceManager = new ServiceManager() services = [] serviceRegistry = Observable.from({}) - serviceFileReference = {} proxy = null ipcRouter = null - async createServicesWatchers() { - for await (let service of this.services) { - const instanceFile = path.basename(service) - const instanceBasePath = path.dirname(service) - + /** + * Creates service registry entries based on discovered services + */ + async createServicesRegistry() { + for await (const servicePath of this.services) { + const instanceFile = path.basename(servicePath) + const instanceBasePath = path.dirname(servicePath) const servicePkg = require( path.resolve(instanceBasePath, "package.json"), ) - this.serviceFileReference[instanceFile] = servicePkg.name - this.serviceRegistry[servicePkg.name] = { - index: this.services.indexOf(service), + index: this.services.indexOf(servicePath), id: servicePkg.name, version: servicePkg.version, file: instanceFile, cwd: instanceBasePath, - buffer: [], ready: false, } } } - async createServicesProcess() { - for await (let service of this.services) { - const { id, version, cwd } = - this.serviceRegistry[ - this.serviceFileReference[path.basename(service)] - ] + /** + * Creates and initializes all service instances + */ + async createServiceInstances() { + for await (const servicePath of this.services) { + const instanceBasePath = path.dirname(servicePath) + const servicePkg = require( + path.resolve(instanceBasePath, "package.json"), + ) + const serviceId = servicePkg.name - this.serviceHandlers.onStarting(id) + const serviceConfig = { + id: serviceId, + version: servicePkg.version, + path: servicePath, + cwd: instanceBasePath, + isProduction, + internalIp: this.state.internalIp, + } - const instance = await spawnService({ - id, - service, - cwd, - onReload: this.serviceHandlers.onReload, - onClose: this.serviceHandlers.onClose, - onError: this.serviceHandlers.onError, - onIPCData: this.serviceHandlers.onIPCData, + // Create service instance + const service = new Service(serviceConfig, { + onReady: this.onServiceReady.bind(this), + onIPCData: this.onServiceIPCData.bind(this), + onServiceExit: this.onServiceExit.bind(this), }) - if (!useLoadSpinner) { - instance.logs.attach() - } + // Add to service manager + this.serviceManager.addService(service) - const serviceInstance = { - id, - version, - instance, - } + // Initialize service + await service.initialize() - // push to pool - this.instancePool.push(serviceInstance) - - // if is development, start a file watcher for hot-reload - if (!isProduction) { - const ignored = [ - ...(await getIgnoredFiles(cwd)), - "**/.cache/**", - "**/node_modules/**", - "**/dist/**", - "**/build/**", - ] - - const watcher = chokidar.watch(cwd, { - ignored: ignored, - persistent: true, - ignoreInitial: true, - }) - - watcher.on("all", (event, path) => { - // find instance from pool - const instanceIndex = this.instancePool.findIndex( - (instance) => instance.id === id, - ) - - console.log(event, path, instanceIndex) - - // reload - this.instancePool[instanceIndex].instance.reload() - }) - } + console.log(`šŸ“¦ [${serviceId}] Service initialized`) } } - serviceHandlers = { - onStarting: (id) => { - if (this.serviceRegistry[id].ready === false) { - if (useLoadSpinner) { - this.spinnies.add(id, { - text: `šŸ“¦ [${id}] Loading service...`, - spinner: DefaultSpinner, - }) - } - } - }, - onStarted: (id) => { - this.serviceRegistry[id].initialized = true + /** + * Handler for service ready event + * @param {Service} service - Service that is ready + */ + onServiceReady(service) { + const serviceId = service.id + this.serviceRegistry[serviceId].initialized = true + this.serviceRegistry[serviceId].ready = true - if (this.serviceRegistry[id].ready === false) { - if (useLoadSpinner) { - if (this.spinnies.pick(id)) { - this.spinnies.succeed(id, { - text: `[${id}][${this.serviceRegistry[id].index}] Ready`, - }) - } - } - } + console.log( + `āœ… [${serviceId}][${this.serviceRegistry[serviceId].index}] Ready`, + ) - this.serviceRegistry[id].ready = true - }, - onIPCData: async (id, msg) => { - if (msg.type === "log") { - console.log(`[${id}] ${msg.message}`) - } + // Check if all services are ready + this.checkAllServicesReady() + } - if (msg.status === "ready") { - await this.serviceHandlers.onStarted(id) - } + /** + * Handler for service IPC data + * @param {Service} service - Service sending the data + * @param {object} data - IPC data received + */ + async onServiceIPCData(service, data) { + const id = service.id - if (msg.type === "router:register") { - if (msg.data.path_overrides) { - for await (let pathOverride of msg.data.path_overrides) { - await this.proxy.register({ - serviceId: id, - path: `/${pathOverride}`, - target: `http://${this.state.internalIp}:${msg.data.listen.port}/${pathOverride}`, - pathRewrite: { - [`^/${pathOverride}`]: "", - }, - }) - } - } else { - await this.proxy.register({ - serviceId: id, - path: `/${id}`, - target: `http://${msg.data.listen.ip}:${msg.data.listen.port}`, - }) - } - } + if (data.type === "log") { + console.log(`[${id}] ${data.message}`) + } - if (msg.type === "router:ws:register") { - let target = `http://${this.state.internalIp}:${msg.data.listen_port ?? msg.data.listen?.port}` + if (data.status === "ready") { + this.onServiceReady(service) + } - if (!msg.data.ws_path && msg.data.namespace) { - target += `/${msg.data.namespace}` - } + if (data.type === "router:register") { + await this.handleRouterRegistration(service, data) + } - if (msg.data.ws_path && msg.data.ws_path !== "/") { - target += `/${msg.data.ws_path}` - } + if (data.type === "router:ws:register") { + await this.handleWebsocketRegistration(service, data) + } + } + /** + * Handler for service exit + * @param {Service} service - Service that exited + * @param {number} code - Exit code + * @param {Error} error - Error if any + */ + onServiceExit(service, code, error) { + const id = service.id + + this.serviceRegistry[id].initialized = true + this.serviceRegistry[id].ready = false + + console.log(`[${id}] Exit with code ${code}`) + + if (error) { + console.error(error) + } + + this.proxy.unregisterAllFromService(id) + } + + /** + * Handle router registration requests from services + * @param {Service} service - Service registering a route + * @param {object} msg - Registration message + */ + async handleRouterRegistration(service, msg) { + const id = service.id + + if (msg.data.path_overrides) { + for await (const pathOverride of msg.data.path_overrides) { await this.proxy.register({ serviceId: id, - path: `/${msg.data.namespace}`, - target: target, + path: `/${pathOverride}`, + target: `http://${this.state.internalIp}:${msg.data.listen.port}/${pathOverride}`, pathRewrite: { - [`^/${msg.data.namespace}`]: "", + [`^/${pathOverride}`]: "", }, - ws: true, }) } - }, - onReload: async ({ id, service, cwd }) => { - console.log(`[onReload] ${id} ${service}`) - - let instance = this.instancePool.find( - (instance) => instance.id === id, - ) - - if (!instance) { - console.error(`āŒ Service ${id} not found`) - return false - } - - // if (this.selectedProcessInstance) { - // if (this.selectedProcessInstance === "all") { - // this.std.detachAllServicesSTD() - // } else if (this.selectedProcessInstance.id === id) { - // this.selectedProcessInstance.instance.logs.detach() - // } - // } - - this.ipcRouter.unregister({ id, instance }) - - // try to unregister from proxy - this.proxy.unregisterAllFromService(id) - - await instance.instance.kill("SIGINT") - - instance.instance = await spawnService({ - id, - service, - cwd, - onReload: this.serviceHandlers.onReload, - onClose: this.serviceHandlers.onClose, - onError: this.serviceHandlers.onError, - onIPCData: this.serviceHandlers.onIPCData, + } else { + await this.proxy.register({ + serviceId: id, + path: `/${id}`, + target: `http://${msg.data.listen.ip}:${msg.data.listen.port}`, }) - - const instanceIndex = this.instancePool.findIndex( - (_instance) => _instance.id === id, - ) - - if (instanceIndex !== -1) { - this.instancePool[instanceIndex] = instance - } - - if (this.selectedProcessInstance) { - if (this.selectedProcessInstance === "all") { - this.std.attachAllServicesSTD() - } else if (this.selectedProcessInstance.id === id) { - this.std.attachServiceSTD(id) - } - } - }, - onClose: (id, code, err) => { - this.serviceRegistry[id].initialized = true - - if (this.serviceRegistry[id].ready === false) { - if (this.spinnies.pick(id)) { - this.spinnies.fail(id, { - text: `[${id}][${this.serviceRegistry[id].index}] Failed with code ${code}`, - }) - } - } - - console.log(`[${id}] Exit with code ${code}`) - - if (err) { - console.error(err) - } - - // try to unregister from proxy - this.proxy.unregisterAllFromService(id) - - this.serviceRegistry[id].ready = false - }, - onError: (id, err) => { - console.error(`[${id}] Error`, err) - }, - } - - onAllServicesReload = (id) => { - for (let instance of this.instancePool) { - instance.instance.reload() } } + /** + * Handle websocket registration requests from services + * @param {Service} service - Service registering a websocket + * @param {object} msg - Registration message + */ + async handleWebsocketRegistration(service, msg) { + const id = service.id + const listenPort = msg.data.listen_port ?? msg.data.listen?.port + let target = `http://${this.state.internalIp}:${listenPort}` + + if (!msg.data.ws_path && msg.data.namespace) { + target += `/${msg.data.namespace}` + } + + if (msg.data.ws_path && msg.data.ws_path !== "/") { + target += `/${msg.data.ws_path}` + } + + await this.proxy.register({ + serviceId: id, + path: `/${msg.data.namespace}`, + target: target, + pathRewrite: { + [`^/${msg.data.namespace}`]: "", + }, + ws: true, + }) + } + + /** + * Check if all services are ready and trigger the ready event + */ + checkAllServicesReady() { + if (this.state.allReady) return + + const allServicesInitialized = Object.values( + this.serviceRegistry, + ).every((service) => service.initialized) + + if (allServicesInitialized) { + this.onAllServicesReady() + } + } + + /** + * Reload all services + */ + reloadAllServices = () => { + this.serviceManager.reloadAllServices() + } + + /** + * Handle when all services are ready + */ onAllServicesReady = async () => { if (this.state.allReady) { return false } console.clear() - this.state.allReady = true console.log(comtyAscii) @@ -308,114 +259,30 @@ export default class Gateway { console.log(`USE: select , reload, exit`) await this.proxy.listen(this.state.proxyPort, this.state.internalIp) - - if (useLoadSpinner) { - if (!this.selectedProcessInstance) { - this.std.detachAllServicesSTD() - this.std.attachAllServicesSTD() - } - } } - onGatewayExit = (code, signal) => { + /** + * Clean up resources on gateway exit + */ + onGatewayExit = () => { console.clear() console.log(`\nšŸ›‘ Preparing to exit...`) - - console.log(`Stoping proxy...`) + console.log(`Stopping proxy...`) this.proxy.close() - - console.log(`Kill all ${this.instancePool.length} instances...`) - - for (let instance of this.instancePool) { - console.log(`Killing ${instance.id} [${instance.instance.pid}]`) - - instance.instance.kill() - - treeKill(instance.instance.pid) - } + console.log(`Stopping all services...`) + this.serviceManager.stopAllServices() treeKill(process.pid) } - std = { - reloadService: () => { - if (!this.selectedProcessInstance) { - console.error(`No service selected`) - return false - } - - if (this.selectedProcessInstance === "all") { - return this.onAllServicesReload() - } - - return this.selectedProcessInstance.instance.reload() - }, - findServiceById: (id) => { - if (!isNaN(parseInt(id))) { - // find by index number - id = this.serviceRegistry[Object.keys(this.serviceRegistry)[id]] - } else { - // find by id - id = this.serviceRegistry[id] - } - - return id - }, - attachServiceSTD: (id) => { - console.log(`Attaching service [${id}]`) - - if (id === "all") { - console.clear() - this.selectedProcessInstance = "all" - return this.std.attachAllServicesSTD() - } - - const service = this.std.findServiceById(id) - - if (!service) { - console.error(`Service [${service}] not found`) - return false - } - - this.selectedProcessInstance = this.instancePool.find( - (instance) => instance.id === service.id, - ) - - if (!this.selectedProcessInstance) { - this.selectedProcessInstance = null - - console.error( - `Cannot find service [${service.id}] in the instances pool`, - ) - - return false - } - - this.std.detachAllServicesSTD() - console.clear() - this.selectedProcessInstance.instance.logs.attach() - - return true - }, - dettachServiceSTD: (id) => {}, - attachAllServicesSTD: () => { - this.std.detachAllServicesSTD() - - for (let service of this.instancePool) { - service.instance.logs.attach() - } - }, - detachAllServicesSTD: () => { - for (let service of this.instancePool) { - service.instance.logs.detach() - } - }, - } - + /** + * Initialize the gateway and start all services + */ async initialize() { onExit(this.onGatewayExit) + // Increase limits to handle many services process.stdout.setMaxListeners(150) process.stderr.setMaxListeners(150) @@ -423,6 +290,12 @@ export default class Gateway { this.proxy = new Proxy() this.ipcRouter = new IPCRouter() + if (this.services.length === 0) { + console.error("āŒ No services found") + return process.exit(1) + } + + // Make key components available globally global.eventBus = this.eventBus global.ipcRouter = this.ipcRouter global.proxy = this.proxy @@ -433,41 +306,37 @@ export default class Gateway { `\nRunning ${chalk.bgBlue(`${pkg.name}`)} | ${chalk.bgMagenta(`[v${pkg.version}]`)} | ${this.state.internalIp} | ${isProduction ? "production" : "development"} \n\n\n`, ) - if (this.services.length === 0) { - console.error("āŒ No services found") - return process.exit(1) - } - console.log(`šŸ“¦ Found ${this.services.length} service(s)`) + // Watch for service state changes Observable.observe(this.serviceRegistry, (changes) => { - const { type } = changes[0] - - switch (type) { - case "update": { - if ( - Object.values(this.serviceRegistry).every( - (service) => service.initialized, - ) - ) { - this.onAllServicesReady() - } - - break - } - } + this.checkAllServicesReady() }) - await this.createServicesWatchers() - - await this.createServicesProcess() + await this.createServicesRegistry() + await this.createServiceInstances() + // Initialize REPL interface new RELP({ - attachAllServicesSTD: this.std.attachAllServicesSTD, - detachAllServicesSTD: this.std.detachAllServicesSTD, - attachServiceSTD: this.std.attachServiceSTD, - dettachServiceSTD: this.std.dettachServiceSTD, - reloadService: this.std.reloadService, + attachAllServicesSTD: () => + this.serviceManager.attachAllServicesStd(), + detachAllServicesSTD: () => + this.serviceManager.detachAllServicesStd(), + attachServiceSTD: (id) => this.serviceManager.attachServiceStd(id), + dettachServiceSTD: (id) => this.serviceManager.detachServiceStd(id), + reloadService: () => { + const selectedService = this.serviceManager.getSelectedService() + if (!selectedService) { + console.error(`No service selected`) + return false + } + + if (selectedService === "all") { + return this.reloadAllServices() + } + + return selectedService.reload() + }, onAllServicesReady: this.onAllServicesReady, }) } diff --git a/packages/server/gateway/services/manager.js b/packages/server/gateway/services/manager.js new file mode 100644 index 00000000..d429482d --- /dev/null +++ b/packages/server/gateway/services/manager.js @@ -0,0 +1,149 @@ +/** + * ServiceManager class - Manages a collection of services + * Provides methods to interact with multiple services + */ +export default class ServiceManager { + constructor() { + this.services = [] + this.selectedService = null + } + + /** + * Add a service to the manager + * @param {Service} service - Service to add + */ + addService(service) { + this.services.push(service) + } + + /** + * Get a service by ID + * @param {string} id - Service ID or index + * @returns {Service} The service or null if not found + */ + getService(id) { + // If ID is a number, treat it as an index + if (!isNaN(parseInt(id))) { + return this.services[parseInt(id)] || null + } + + // Otherwise look up by ID + return this.services.find((service) => service.id === id) || null + } + + /** + * Get the currently selected service + * @returns {Service|string|null} The selected service, "all", or null + */ + getSelectedService() { + return this.selectedService + } + + /** + * Set the currently selected service + * @param {string} id - Service ID, index, or "all" + * @returns {boolean} True if selection was successful + */ + selectService(id) { + if (id === "all") { + this.selectedService = "all" + return true + } + + const service = this.getService(id) + if (!service) { + console.error(`Service [${id}] not found`) + return false + } + + this.selectedService = service + return true + } + + /** + * Reload all services + */ + reloadAllServices() { + console.log("Reloading all services...") + + for (const service of this.services) { + service.reload() + } + } + + /** + * Stop all services + */ + stopAllServices() { + console.log("Stopping all services...") + + for (const service of this.services) { + service.stop() + } + } + + /** + * Attach to a specific service's standard output + * @param {string} id - Service ID or index + * @returns {boolean} True if attachment was successful + */ + attachServiceStd(id) { + console.log(`Attaching to service [${id}]`) + + if (id === "all") { + this.selectedService = "all" + this.attachAllServicesStd() + return true + } + + const service = this.getService(id) + + if (!service) { + console.error(`Service [${id}] not found`) + return false + } + + // Detach from all first + this.detachAllServicesStd() + + // Then attach to the selected one + console.clear() + service.attachStd() + this.selectedService = service + + return true + } + + /** + * Detach from a specific service's standard output + * @param {string} id - Service ID or index + */ + detachServiceStd(id) { + const service = this.getService(id) + if (service) { + service.detachStd() + } + } + + /** + * Attach to all services' standard output + */ + attachAllServicesStd() { + this.detachAllServicesStd() + + for (const service of this.services) { + service.attachStd() + } + + this.selectedService = "all" + } + + /** + * Detach from all services' standard output + */ + detachAllServicesStd() { + for (const service of this.services) { + service.detachStd() + } + } +} diff --git a/packages/server/gateway/services/service.js b/packages/server/gateway/services/service.js new file mode 100644 index 00000000..ea3a025a --- /dev/null +++ b/packages/server/gateway/services/service.js @@ -0,0 +1,190 @@ +import chokidar from "chokidar" +import path from "path" +import spawnService from "../utils/spawnService" +import getIgnoredFiles from "../utils/getIgnoredFiles" + +/** + * Service class - Represents a single service instance + * Manages the lifecycle, communication, and monitoring of a service + */ +export default class Service { + /** + * @param {object} config - Service configuration + * @param {string} config.id - Service identifier + * @param {string} config.version - Service version + * @param {string} config.path - Path to service entry file + * @param {string} config.cwd - Current working directory + * @param {boolean} config.isProduction - Whether running in production mode + * @param {string} config.internalIp - Internal IP address + * @param {object} handlers - Event handlers + * @param {Function} handlers.onReady - Called when service is ready + * @param {Function} handlers.onIPCData - Called when IPC data is received + * @param {Function} handlers.onServiceExit - Called when service exits + */ + constructor(config, handlers) { + this.id = config.id + this.version = config.version + this.path = config.path + this.cwd = config.cwd + this.isProduction = config.isProduction + this.internalIp = config.internalIp + + this.instance = null + this.fileWatcher = null + + this.handlers = handlers + } + + /** + * Initialize the service and start its process + */ + async initialize() { + await this.startProcess() + + if (!this.isProduction) { + await this.setupFileWatcher() + } + } + + /** + * Start the service process + */ + async startProcess() { + this.instance = await spawnService({ + id: this.id, + service: this.path, + cwd: this.cwd, + onReload: this.handleReload.bind(this), + onClose: this.handleClose.bind(this), + onError: this.handleError.bind(this), + onIPCData: this.handleIPCData.bind(this), + }) + + this.instance.logs.attach() + return this.instance + } + + /** + * Set up file watcher for development hot-reload + */ + async setupFileWatcher() { + const ignored = [ + ...(await getIgnoredFiles(this.cwd)), + "**/.cache/**", + "**/node_modules/**", + "**/dist/**", + "**/build/**", + ] + + this.fileWatcher = chokidar.watch(this.cwd, { + ignored, + persistent: true, + ignoreInitial: true, + }) + + this.fileWatcher.on("all", (event, filePath) => { + console.log(`[${this.id}] File changed: ${event} ${filePath}`) + this.reload() + }) + } + + /** + * Handle IPC data from the service + * @param {string} id - Service ID + * @param {object} data - IPC data + */ + handleIPCData(id, data) { + if (this.handlers.onIPCData) { + this.handlers.onIPCData(this, data) + } + } + + /** + * Handle service reload request + * @param {object} params - Reload parameters + */ + async handleReload(params) { + // The actual reload is handled by the reload() method + console.log(`[${this.id}] Handling reload request`) + } + + /** + * Handle service closure + * @param {string} id - Service ID + * @param {number} code - Exit code + * @param {Error} error - Error if any + */ + async handleClose(id, code, error) { + if (this.handlers.onServiceExit) { + await this.handlers.onServiceExit(this, code, error) + } + + // In production, we might want to restart the service + if (this.isProduction && code !== 0) { + console.error(`[${this.id}] Service crashed, restarting...`) + await new Promise((resolve) => setTimeout(resolve, 1000)) + await this.reload() + } + } + + /** + * Handle service errors + * @param {string} id - Service ID + * @param {Error} error - Error object + */ + handleError(id, error) { + console.error(`[${this.id}] Error:`, error) + } + + /** + * Reload the service + */ + async reload() { + console.log(`[${this.id}] Reloading service...`) + + // Kill the current process if is running + if (this.instance.exitCode !== null) { + await this.instance.kill("SIGINT") + } + + // Start a new process + await this.startProcess() + + return true + } + + /** + * Stop the service + */ + async stop() { + console.log(`[${this.id}] Stopping service...`) + + if (this.fileWatcher) { + await this.fileWatcher.close() + this.fileWatcher = null + } + + if (this.instance) { + await this.instance.kill("SIGINT") + this.instance = null + } + } + + /** + * Attach to service standard output + */ + attachStd() { + if (this.instance) { + this.instance.logs.attach() + } + } + + /** + * Detach from service standard output + */ + detachStd() { + if (this.instance) { + this.instance.logs.detach() + } + } +}