improve gateway

This commit is contained in:
SrGooglo 2025-03-28 18:30:41 +00:00
parent ab068d1a36
commit bdbecb2857
3 changed files with 553 additions and 345 deletions

View File

@ -1,12 +1,9 @@
require("dotenv").config()
import path from "node:path"
import Spinnies from "spinnies"
import { Observable } from "@gullerya/object-observer"
import { dots as DefaultSpinner } from "spinnies/spinners.json"
import EventEmitter from "@foxify/events"
import { Observable } from "@gullerya/object-observer"
import IPCRouter from "linebridge/dist/classes/IPCRouter"
import chokidar from "chokidar"
import { onExit } from "signal-exit"
import chalk from "chalk"
import treeKill from "tree-kill"
@ -14,18 +11,21 @@ import treeKill from "tree-kill"
import getIgnoredFiles from "./utils/getIgnoredFiles"
import scanServices from "./utils/scanServices"
import spawnService from "./utils/spawnService"
import Proxy from "./proxy"
import RELP from "./repl"
import comtyAscii from "./ascii"
import pkg from "../package.json"
const useLoadSpinner = process.argv.includes("--load-spinner")
import ServiceManager from "./services/manager"
import Service from "./services/service"
const isProduction = process.env.NODE_ENV === "production"
/**
* Gateway class - Main entry point for the service orchestrator
* Manages service discovery, spawning, and communication
*/
export default class Gateway {
spinnies = new Spinnies()
eventBus = new EventEmitter()
state = {
@ -34,140 +34,146 @@ export default class Gateway {
allReady: false,
}
selectedProcessInstance = null
instancePool = []
selectedService = null
serviceManager = new ServiceManager()
services = []
serviceRegistry = Observable.from({})
serviceFileReference = {}
proxy = null
ipcRouter = null
async createServicesWatchers() {
for await (let service of this.services) {
const instanceFile = path.basename(service)
const instanceBasePath = path.dirname(service)
/**
* Creates service registry entries based on discovered services
*/
async createServicesRegistry() {
for await (const servicePath of this.services) {
const instanceFile = path.basename(servicePath)
const instanceBasePath = path.dirname(servicePath)
const servicePkg = require(
path.resolve(instanceBasePath, "package.json"),
)
this.serviceFileReference[instanceFile] = servicePkg.name
this.serviceRegistry[servicePkg.name] = {
index: this.services.indexOf(service),
index: this.services.indexOf(servicePath),
id: servicePkg.name,
version: servicePkg.version,
file: instanceFile,
cwd: instanceBasePath,
buffer: [],
ready: false,
}
}
}
async createServicesProcess() {
for await (let service of this.services) {
const { id, version, cwd } =
this.serviceRegistry[
this.serviceFileReference[path.basename(service)]
]
/**
* Creates and initializes all service instances
*/
async createServiceInstances() {
for await (const servicePath of this.services) {
const instanceBasePath = path.dirname(servicePath)
const servicePkg = require(
path.resolve(instanceBasePath, "package.json"),
)
const serviceId = servicePkg.name
this.serviceHandlers.onStarting(id)
const instance = await spawnService({
id,
service,
cwd,
onReload: this.serviceHandlers.onReload,
onClose: this.serviceHandlers.onClose,
onError: this.serviceHandlers.onError,
onIPCData: this.serviceHandlers.onIPCData,
})
if (!useLoadSpinner) {
instance.logs.attach()
const serviceConfig = {
id: serviceId,
version: servicePkg.version,
path: servicePath,
cwd: instanceBasePath,
isProduction,
internalIp: this.state.internalIp,
}
const serviceInstance = {
id,
version,
instance,
}
// push to pool
this.instancePool.push(serviceInstance)
// if is development, start a file watcher for hot-reload
if (!isProduction) {
const ignored = [
...(await getIgnoredFiles(cwd)),
"**/.cache/**",
"**/node_modules/**",
"**/dist/**",
"**/build/**",
]
const watcher = chokidar.watch(cwd, {
ignored: ignored,
persistent: true,
ignoreInitial: true,
// Create service instance
const service = new Service(serviceConfig, {
onReady: this.onServiceReady.bind(this),
onIPCData: this.onServiceIPCData.bind(this),
onServiceExit: this.onServiceExit.bind(this),
})
watcher.on("all", (event, path) => {
// find instance from pool
const instanceIndex = this.instancePool.findIndex(
(instance) => instance.id === id,
// Add to service manager
this.serviceManager.addService(service)
// Initialize service
await service.initialize()
console.log(`📦 [${serviceId}] Service initialized`)
}
}
/**
* Handler for service ready event
* @param {Service} service - Service that is ready
*/
onServiceReady(service) {
const serviceId = service.id
this.serviceRegistry[serviceId].initialized = true
this.serviceRegistry[serviceId].ready = true
console.log(
`✅ [${serviceId}][${this.serviceRegistry[serviceId].index}] Ready`,
)
console.log(event, path, instanceIndex)
// reload
this.instancePool[instanceIndex].instance.reload()
})
// Check if all services are ready
this.checkAllServicesReady()
}
/**
* Handler for service IPC data
* @param {Service} service - Service sending the data
* @param {object} data - IPC data received
*/
async onServiceIPCData(service, data) {
const id = service.id
if (data.type === "log") {
console.log(`[${id}] ${data.message}`)
}
if (data.status === "ready") {
this.onServiceReady(service)
}
if (data.type === "router:register") {
await this.handleRouterRegistration(service, data)
}
if (data.type === "router:ws:register") {
await this.handleWebsocketRegistration(service, data)
}
}
serviceHandlers = {
onStarting: (id) => {
if (this.serviceRegistry[id].ready === false) {
if (useLoadSpinner) {
this.spinnies.add(id, {
text: `📦 [${id}] Loading service...`,
spinner: DefaultSpinner,
})
}
}
},
onStarted: (id) => {
/**
* Handler for service exit
* @param {Service} service - Service that exited
* @param {number} code - Exit code
* @param {Error} error - Error if any
*/
onServiceExit(service, code, error) {
const id = service.id
this.serviceRegistry[id].initialized = true
this.serviceRegistry[id].ready = false
if (this.serviceRegistry[id].ready === false) {
if (useLoadSpinner) {
if (this.spinnies.pick(id)) {
this.spinnies.succeed(id, {
text: `[${id}][${this.serviceRegistry[id].index}] Ready`,
})
}
}
console.log(`[${id}] Exit with code ${code}`)
if (error) {
console.error(error)
}
this.serviceRegistry[id].ready = true
},
onIPCData: async (id, msg) => {
if (msg.type === "log") {
console.log(`[${id}] ${msg.message}`)
this.proxy.unregisterAllFromService(id)
}
if (msg.status === "ready") {
await this.serviceHandlers.onStarted(id)
}
/**
* Handle router registration requests from services
* @param {Service} service - Service registering a route
* @param {object} msg - Registration message
*/
async handleRouterRegistration(service, msg) {
const id = service.id
if (msg.type === "router:register") {
if (msg.data.path_overrides) {
for await (let pathOverride of msg.data.path_overrides) {
for await (const pathOverride of msg.data.path_overrides) {
await this.proxy.register({
serviceId: id,
path: `/${pathOverride}`,
@ -186,8 +192,15 @@ export default class Gateway {
}
}
if (msg.type === "router:ws:register") {
let target = `http://${this.state.internalIp}:${msg.data.listen_port ?? msg.data.listen?.port}`
/**
* Handle websocket registration requests from services
* @param {Service} service - Service registering a websocket
* @param {object} msg - Registration message
*/
async handleWebsocketRegistration(service, msg) {
const id = service.id
const listenPort = msg.data.listen_port ?? msg.data.listen?.port
let target = `http://${this.state.internalIp}:${listenPort}`
if (!msg.data.ws_path && msg.data.namespace) {
target += `/${msg.data.namespace}`
@ -207,100 +220,38 @@ export default class Gateway {
ws: true,
})
}
},
onReload: async ({ id, service, cwd }) => {
console.log(`[onReload] ${id} ${service}`)
let instance = this.instancePool.find(
(instance) => instance.id === id,
)
/**
* Check if all services are ready and trigger the ready event
*/
checkAllServicesReady() {
if (this.state.allReady) return
if (!instance) {
console.error(`❌ Service ${id} not found`)
return false
}
const allServicesInitialized = Object.values(
this.serviceRegistry,
).every((service) => service.initialized)
// if (this.selectedProcessInstance) {
// if (this.selectedProcessInstance === "all") {
// this.std.detachAllServicesSTD()
// } else if (this.selectedProcessInstance.id === id) {
// this.selectedProcessInstance.instance.logs.detach()
// }
// }
this.ipcRouter.unregister({ id, instance })
// try to unregister from proxy
this.proxy.unregisterAllFromService(id)
await instance.instance.kill("SIGINT")
instance.instance = await spawnService({
id,
service,
cwd,
onReload: this.serviceHandlers.onReload,
onClose: this.serviceHandlers.onClose,
onError: this.serviceHandlers.onError,
onIPCData: this.serviceHandlers.onIPCData,
})
const instanceIndex = this.instancePool.findIndex(
(_instance) => _instance.id === id,
)
if (instanceIndex !== -1) {
this.instancePool[instanceIndex] = instance
}
if (this.selectedProcessInstance) {
if (this.selectedProcessInstance === "all") {
this.std.attachAllServicesSTD()
} else if (this.selectedProcessInstance.id === id) {
this.std.attachServiceSTD(id)
}
}
},
onClose: (id, code, err) => {
this.serviceRegistry[id].initialized = true
if (this.serviceRegistry[id].ready === false) {
if (this.spinnies.pick(id)) {
this.spinnies.fail(id, {
text: `[${id}][${this.serviceRegistry[id].index}] Failed with code ${code}`,
})
if (allServicesInitialized) {
this.onAllServicesReady()
}
}
console.log(`[${id}] Exit with code ${code}`)
if (err) {
console.error(err)
}
// try to unregister from proxy
this.proxy.unregisterAllFromService(id)
this.serviceRegistry[id].ready = false
},
onError: (id, err) => {
console.error(`[${id}] Error`, err)
},
}
onAllServicesReload = (id) => {
for (let instance of this.instancePool) {
instance.instance.reload()
}
/**
* Reload all services
*/
reloadAllServices = () => {
this.serviceManager.reloadAllServices()
}
/**
* Handle when all services are ready
*/
onAllServicesReady = async () => {
if (this.state.allReady) {
return false
}
console.clear()
this.state.allReady = true
console.log(comtyAscii)
@ -308,114 +259,30 @@ export default class Gateway {
console.log(`USE: select <service>, reload, exit`)
await this.proxy.listen(this.state.proxyPort, this.state.internalIp)
if (useLoadSpinner) {
if (!this.selectedProcessInstance) {
this.std.detachAllServicesSTD()
this.std.attachAllServicesSTD()
}
}
}
onGatewayExit = (code, signal) => {
/**
* Clean up resources on gateway exit
*/
onGatewayExit = () => {
console.clear()
console.log(`\n🛑 Preparing to exit...`)
console.log(`Stoping proxy...`)
console.log(`Stopping proxy...`)
this.proxy.close()
console.log(`Kill all ${this.instancePool.length} instances...`)
for (let instance of this.instancePool) {
console.log(`Killing ${instance.id} [${instance.instance.pid}]`)
instance.instance.kill()
treeKill(instance.instance.pid)
}
console.log(`Stopping all services...`)
this.serviceManager.stopAllServices()
treeKill(process.pid)
}
std = {
reloadService: () => {
if (!this.selectedProcessInstance) {
console.error(`No service selected`)
return false
}
if (this.selectedProcessInstance === "all") {
return this.onAllServicesReload()
}
return this.selectedProcessInstance.instance.reload()
},
findServiceById: (id) => {
if (!isNaN(parseInt(id))) {
// find by index number
id = this.serviceRegistry[Object.keys(this.serviceRegistry)[id]]
} else {
// find by id
id = this.serviceRegistry[id]
}
return id
},
attachServiceSTD: (id) => {
console.log(`Attaching service [${id}]`)
if (id === "all") {
console.clear()
this.selectedProcessInstance = "all"
return this.std.attachAllServicesSTD()
}
const service = this.std.findServiceById(id)
if (!service) {
console.error(`Service [${service}] not found`)
return false
}
this.selectedProcessInstance = this.instancePool.find(
(instance) => instance.id === service.id,
)
if (!this.selectedProcessInstance) {
this.selectedProcessInstance = null
console.error(
`Cannot find service [${service.id}] in the instances pool`,
)
return false
}
this.std.detachAllServicesSTD()
console.clear()
this.selectedProcessInstance.instance.logs.attach()
return true
},
dettachServiceSTD: (id) => {},
attachAllServicesSTD: () => {
this.std.detachAllServicesSTD()
for (let service of this.instancePool) {
service.instance.logs.attach()
}
},
detachAllServicesSTD: () => {
for (let service of this.instancePool) {
service.instance.logs.detach()
}
},
}
/**
* Initialize the gateway and start all services
*/
async initialize() {
onExit(this.onGatewayExit)
// Increase limits to handle many services
process.stdout.setMaxListeners(150)
process.stderr.setMaxListeners(150)
@ -423,6 +290,12 @@ export default class Gateway {
this.proxy = new Proxy()
this.ipcRouter = new IPCRouter()
if (this.services.length === 0) {
console.error("❌ No services found")
return process.exit(1)
}
// Make key components available globally
global.eventBus = this.eventBus
global.ipcRouter = this.ipcRouter
global.proxy = this.proxy
@ -433,41 +306,37 @@ export default class Gateway {
`\nRunning ${chalk.bgBlue(`${pkg.name}`)} | ${chalk.bgMagenta(`[v${pkg.version}]`)} | ${this.state.internalIp} | ${isProduction ? "production" : "development"} \n\n\n`,
)
if (this.services.length === 0) {
console.error("❌ No services found")
return process.exit(1)
}
console.log(`📦 Found ${this.services.length} service(s)`)
// Watch for service state changes
Observable.observe(this.serviceRegistry, (changes) => {
const { type } = changes[0]
switch (type) {
case "update": {
if (
Object.values(this.serviceRegistry).every(
(service) => service.initialized,
)
) {
this.onAllServicesReady()
}
break
}
}
this.checkAllServicesReady()
})
await this.createServicesWatchers()
await this.createServicesProcess()
await this.createServicesRegistry()
await this.createServiceInstances()
// Initialize REPL interface
new RELP({
attachAllServicesSTD: this.std.attachAllServicesSTD,
detachAllServicesSTD: this.std.detachAllServicesSTD,
attachServiceSTD: this.std.attachServiceSTD,
dettachServiceSTD: this.std.dettachServiceSTD,
reloadService: this.std.reloadService,
attachAllServicesSTD: () =>
this.serviceManager.attachAllServicesStd(),
detachAllServicesSTD: () =>
this.serviceManager.detachAllServicesStd(),
attachServiceSTD: (id) => this.serviceManager.attachServiceStd(id),
dettachServiceSTD: (id) => this.serviceManager.detachServiceStd(id),
reloadService: () => {
const selectedService = this.serviceManager.getSelectedService()
if (!selectedService) {
console.error(`No service selected`)
return false
}
if (selectedService === "all") {
return this.reloadAllServices()
}
return selectedService.reload()
},
onAllServicesReady: this.onAllServicesReady,
})
}

View File

@ -0,0 +1,149 @@
/**
* ServiceManager class - Manages a collection of services
* Provides methods to interact with multiple services
*/
export default class ServiceManager {
constructor() {
this.services = []
this.selectedService = null
}
/**
* Add a service to the manager
* @param {Service} service - Service to add
*/
addService(service) {
this.services.push(service)
}
/**
* Get a service by ID
* @param {string} id - Service ID or index
* @returns {Service} The service or null if not found
*/
getService(id) {
// If ID is a number, treat it as an index
if (!isNaN(parseInt(id))) {
return this.services[parseInt(id)] || null
}
// Otherwise look up by ID
return this.services.find((service) => service.id === id) || null
}
/**
* Get the currently selected service
* @returns {Service|string|null} The selected service, "all", or null
*/
getSelectedService() {
return this.selectedService
}
/**
* Set the currently selected service
* @param {string} id - Service ID, index, or "all"
* @returns {boolean} True if selection was successful
*/
selectService(id) {
if (id === "all") {
this.selectedService = "all"
return true
}
const service = this.getService(id)
if (!service) {
console.error(`Service [${id}] not found`)
return false
}
this.selectedService = service
return true
}
/**
* Reload all services
*/
reloadAllServices() {
console.log("Reloading all services...")
for (const service of this.services) {
service.reload()
}
}
/**
* Stop all services
*/
stopAllServices() {
console.log("Stopping all services...")
for (const service of this.services) {
service.stop()
}
}
/**
* Attach to a specific service's standard output
* @param {string} id - Service ID or index
* @returns {boolean} True if attachment was successful
*/
attachServiceStd(id) {
console.log(`Attaching to service [${id}]`)
if (id === "all") {
this.selectedService = "all"
this.attachAllServicesStd()
return true
}
const service = this.getService(id)
if (!service) {
console.error(`Service [${id}] not found`)
return false
}
// Detach from all first
this.detachAllServicesStd()
// Then attach to the selected one
console.clear()
service.attachStd()
this.selectedService = service
return true
}
/**
* Detach from a specific service's standard output
* @param {string} id - Service ID or index
*/
detachServiceStd(id) {
const service = this.getService(id)
if (service) {
service.detachStd()
}
}
/**
* Attach to all services' standard output
*/
attachAllServicesStd() {
this.detachAllServicesStd()
for (const service of this.services) {
service.attachStd()
}
this.selectedService = "all"
}
/**
* Detach from all services' standard output
*/
detachAllServicesStd() {
for (const service of this.services) {
service.detachStd()
}
}
}

View File

@ -0,0 +1,190 @@
import chokidar from "chokidar"
import path from "path"
import spawnService from "../utils/spawnService"
import getIgnoredFiles from "../utils/getIgnoredFiles"
/**
* Service class - Represents a single service instance
* Manages the lifecycle, communication, and monitoring of a service
*/
export default class Service {
/**
* @param {object} config - Service configuration
* @param {string} config.id - Service identifier
* @param {string} config.version - Service version
* @param {string} config.path - Path to service entry file
* @param {string} config.cwd - Current working directory
* @param {boolean} config.isProduction - Whether running in production mode
* @param {string} config.internalIp - Internal IP address
* @param {object} handlers - Event handlers
* @param {Function} handlers.onReady - Called when service is ready
* @param {Function} handlers.onIPCData - Called when IPC data is received
* @param {Function} handlers.onServiceExit - Called when service exits
*/
constructor(config, handlers) {
this.id = config.id
this.version = config.version
this.path = config.path
this.cwd = config.cwd
this.isProduction = config.isProduction
this.internalIp = config.internalIp
this.instance = null
this.fileWatcher = null
this.handlers = handlers
}
/**
* Initialize the service and start its process
*/
async initialize() {
await this.startProcess()
if (!this.isProduction) {
await this.setupFileWatcher()
}
}
/**
* Start the service process
*/
async startProcess() {
this.instance = await spawnService({
id: this.id,
service: this.path,
cwd: this.cwd,
onReload: this.handleReload.bind(this),
onClose: this.handleClose.bind(this),
onError: this.handleError.bind(this),
onIPCData: this.handleIPCData.bind(this),
})
this.instance.logs.attach()
return this.instance
}
/**
* Set up file watcher for development hot-reload
*/
async setupFileWatcher() {
const ignored = [
...(await getIgnoredFiles(this.cwd)),
"**/.cache/**",
"**/node_modules/**",
"**/dist/**",
"**/build/**",
]
this.fileWatcher = chokidar.watch(this.cwd, {
ignored,
persistent: true,
ignoreInitial: true,
})
this.fileWatcher.on("all", (event, filePath) => {
console.log(`[${this.id}] File changed: ${event} ${filePath}`)
this.reload()
})
}
/**
* Handle IPC data from the service
* @param {string} id - Service ID
* @param {object} data - IPC data
*/
handleIPCData(id, data) {
if (this.handlers.onIPCData) {
this.handlers.onIPCData(this, data)
}
}
/**
* Handle service reload request
* @param {object} params - Reload parameters
*/
async handleReload(params) {
// The actual reload is handled by the reload() method
console.log(`[${this.id}] Handling reload request`)
}
/**
* Handle service closure
* @param {string} id - Service ID
* @param {number} code - Exit code
* @param {Error} error - Error if any
*/
async handleClose(id, code, error) {
if (this.handlers.onServiceExit) {
await this.handlers.onServiceExit(this, code, error)
}
// In production, we might want to restart the service
if (this.isProduction && code !== 0) {
console.error(`[${this.id}] Service crashed, restarting...`)
await new Promise((resolve) => setTimeout(resolve, 1000))
await this.reload()
}
}
/**
* Handle service errors
* @param {string} id - Service ID
* @param {Error} error - Error object
*/
handleError(id, error) {
console.error(`[${this.id}] Error:`, error)
}
/**
* Reload the service
*/
async reload() {
console.log(`[${this.id}] Reloading service...`)
// Kill the current process if is running
if (this.instance.exitCode !== null) {
await this.instance.kill("SIGINT")
}
// Start a new process
await this.startProcess()
return true
}
/**
* Stop the service
*/
async stop() {
console.log(`[${this.id}] Stopping service...`)
if (this.fileWatcher) {
await this.fileWatcher.close()
this.fileWatcher = null
}
if (this.instance) {
await this.instance.kill("SIGINT")
this.instance = null
}
}
/**
* Attach to service standard output
*/
attachStd() {
if (this.instance) {
this.instance.logs.attach()
}
}
/**
* Detach from service standard output
*/
detachStd() {
if (this.instance) {
this.instance.logs.detach()
}
}
}