support for new ws manager

This commit is contained in:
SrGooglo 2025-03-25 22:53:31 +00:00
parent e833038542
commit 2f77c50635
6 changed files with 396 additions and 259 deletions

View File

@ -1,18 +1,20 @@
import SessionModel from "../models/session"
import { reauthenticateWebsockets } from ".."
export default async () => {
__comty_shared_state.eventBus.emit("session:refreshing")
__comty_shared_state.refreshingToken = true
// send request to regenerate token
const response = await __comty_shared_state.baseRequest({
const response = await __comty_shared_state
.baseRequest({
method: "POST",
url: "/auth",
data: {
authToken: await SessionModel.token,
refreshToken: await SessionModel.refreshToken,
}
}).catch((error) => {
},
})
.catch((error) => {
return false
})
@ -36,7 +38,9 @@ export default async () => {
__comty_shared_state.eventBus.emit("session:refreshed")
__comty_shared_state.refreshingToken = false
reauthenticateWebsockets()
if (typeof __comty_shared_state.ws === "object") {
await __comty_shared_state.ws.connectAll()
}
return true
}

View File

@ -1,14 +1,14 @@
import pkg from "../package.json"
import EventEmitter from "@foxify/events"
import axios from "axios"
import { io } from "socket.io-client"
import { createHandlers } from "./models"
import AddonsManager from "./addons"
import WebsocketManager from "./ws"
import Storage from "./helpers/withStorage"
import remote from "./remote"
import Remotes from "./remotes"
globalThis.isServerMode = typeof window === "undefined" && typeof global !== "undefined"
globalThis.isServerMode =
typeof window === "undefined" && typeof global !== "undefined"
if (globalThis.isServerMode) {
const { Buffer } = require("buffer")
@ -21,146 +21,6 @@ if (globalThis.isServerMode) {
}
}
/**
* Creates websockets by disconnecting and removing listeners from existing instances,
* then creating new instances for each websocket in the remote.websockets array.
* Registers event listeners for connection, disconnection, reconnection, error, and any other events.
*
* @return {Promise<void>} A promise that resolves when all websockets have been created and event listeners have been registered.
*/
export async function createWebsockets() {
if (!remote.websockets) {
return false
}
const instances = globalThis.__comty_shared_state.sockets
for (let [key, instance] of Object.entries(instances)) {
if (instance.connected) {
// disconnect first
instance.disconnect()
}
// remove current listeners
instance.removeAllListeners()
delete globalThis.__comty_shared_state.sockets[key]
}
for (let ws of remote.websockets) {
let opts = {
transports: ["websocket"],
autoConnect: ws.autoConnect ?? true,
forceNew: true,
path: ws.path,
...ws.params ?? {},
}
if (ws.noAuth !== true) {
opts.auth = {
token: Storage.engine.get("token"),
}
}
globalThis.__comty_shared_state.sockets[ws.namespace] = io(remote.origin, opts)
}
// regsister events
for (let [key, instance] of Object.entries(instances)) {
instance.on("connect", () => {
//console.debug(`[WS-API][${key}] Connected`)
globalThis.__comty_shared_state.eventBus.emit(`${key}:connected`)
})
instance.on("disconnect", () => {
console.debug(`[WS-API][${key}] Disconnected`)
globalThis.__comty_shared_state.eventBus.emit(`${key}:disconnected`)
})
instance.on("reconnect", () => {
console.debug(`[WS-API][${key}] Reconnected`)
globalThis.__comty_shared_state.eventBus.emit(`${key}:reconnected`)
reauthenticateWebsockets()
})
instance.on("error", (error) => {
console.error(`[WS-API][${key}] Error`, error)
globalThis.__comty_shared_state.eventBus.emit(`${key}:error`, error)
})
instance.onAny((event, ...args) => {
console.debug(`[WS-API][${key}] Event (${event})`, ...args)
globalThis.__comty_shared_state.eventBus.emit(`${key}:${event}`, ...args)
})
}
}
/**
* Disconnects all websocket instances by calling the `disconnect` method on each instance.
*
* @return {Promise<void>} A promise that resolves when all websocket instances have been disconnected.
*/
export async function disconnectWebsockets() {
const instances = globalThis.__comty_shared_state.sockets
for (let [key, instance] of Object.entries(instances)) {
if (instance.connected) {
instance.disconnect()
}
}
}
/**
* Reconnects all websocket instances by disconnecting and reconnecting them with the current token.
*
* @return {Promise<void>} A promise that resolves when all websocket instances have been reconnected.
*/
export async function reconnectWebsockets() {
const instances = globalThis.__comty_shared_state.sockets
for (let [key, instance] of Object.entries(instances)) {
if (instance.connected) {
// disconnect first
instance.disconnect()
}
instance.auth = {
token: Storage.engine.get("token"),
}
instance.connect()
}
}
/**
* Reauthenticates all websocket instances with the current token. If a websocket instance is not connected, it connects to the server. If it is connected, it emits an "auth:reauth" event with the current token.
*
* @return {Promise<void>} Promise that resolves when all websocket instances have been reauthenticated.
*/
export async function reauthenticateWebsockets() {
const instances = globalThis.__comty_shared_state.sockets
for (let [key, instance] of Object.entries(instances)) {
const token = Storage.engine.get("token")
instance.auth = {
token: token,
}
if (!instance.connected) {
instance.connect()
} else {
instance.emit("auth:reauth", token)
}
}
}
/**
* Create a client with the specified access key, private key, and websocket enablement.
*
@ -171,18 +31,18 @@ export function createClient({
accessKey = null,
privateKey = null,
enableWs = false,
origin = remote.origin,
origin = Remotes.origin,
eventBus = new EventEmitter(),
} = {}) {
const sharedState = globalThis.__comty_shared_state = {
eventBus: new EventEmitter(),
const sharedState = (globalThis.__comty_shared_state = {
eventBus: eventBus,
mainOrigin: origin,
baseRequest: null,
sockets: new Map(),
ws: null,
rest: null,
version: pkg.version,
}
sharedState.rest = createHandlers()
addons: new AddonsManager(),
})
if (privateKey && accessKey && globalThis.isServerMode) {
Storage.engine.set("token", `${accessKey}:${privateKey}`)
@ -192,7 +52,7 @@ export function createClient({
baseURL: origin,
headers: {
"Content-Type": "application/json",
}
},
})
// create a interceptor to attach the token every request
@ -202,7 +62,8 @@ export function createClient({
const sessionToken = Storage.engine.get("token")
if (sessionToken) {
config.headers["Authorization"] = `${globalThis.isServerMode ? "Server" : "Bearer"} ${sessionToken}`
config.headers["Authorization"] =
`${globalThis.isServerMode ? "Server" : "Bearer"} ${sessionToken}`
} else {
console.warn("Making a request with no session token")
}
@ -211,8 +72,9 @@ export function createClient({
return config
})
if (enableWs) {
createWebsockets()
if (enableWs == true) {
__comty_shared_state.ws = new WebsocketManager()
sharedState.ws.connectAll()
}
return sharedState

View File

@ -1,35 +0,0 @@
const envOrigins = {
"development": `https://fr01.ragestudio.net:9000`,//`${location.origin}/api`,
"indev": "https://indev.comty.app/api",
"production": "https://api.comty.app",
}
export default {
origin: envOrigins[process.env.NODE_ENV ?? "production"],
websockets: [
{
namespace: "posts",
path: "/posts",
},
{
namespace: "main",
path: "/main",
},
{
namespace: "notifications",
path: "/notifications",
},
{
namespace: "chats",
path: "/chats",
},
{
namespace: "music",
path: "/music",
}
// {
// namespace: "payments",
// path: "/payments",
// }
]
}

36
src/remotes.js Executable file
View File

@ -0,0 +1,36 @@
const envOrigins = {
development: `https://fr01.ragestudio.net:9000`, //`${location.origin}/api`,
indev: "https://indev.comty.app/api",
production: "https://api.comty.app",
}
export default {
origin: envOrigins[process.env.NODE_ENV ?? "production"],
websockets: [
{
namespace: "posts",
path: "/posts",
ng: true,
},
{
namespace: "main",
path: "/main",
},
{
namespace: "notifications",
path: "/notifications",
},
{
namespace: "chats",
path: "/chats",
},
{
namespace: "music",
path: "/music",
},
// {
// namespace: "payments",
// path: "/payments",
// }
],
}

131
src/rtclient.js Normal file
View File

@ -0,0 +1,131 @@
export class RTEngineClient {
constructor(params = {}) {
this.params = params
}
socket = null
stateSubscribers = []
joinedTopics = new Set()
handlers = new Set()
async connect() {
return new Promise((resolve, reject) => {
if (this.socket) {
this.disconnect()
}
let url = `${this.params.url}`
if (this.params.token) {
url += `?token=${this.params.token}`
}
this.socket = new WebSocket(url)
this.socket.onopen = () => {
resolve()
this._emit("connect")
}
this.socket.onclose = () => {
this._emit("disconnect")
}
this.socket.onerror = () => {
reject()
this._emit("error")
}
this.socket.onmessage = (event) => this.handleMessage(event)
})
}
async disconnect() {
if (!this.socket) {
return false
}
for await (const topic of this.joinedTopics) {
this.leaveTopic(topic)
}
this.socket.close()
this.socket = null
}
_emit(event, data) {
for (const handler of this.handlers) {
if (handler.event === event) {
handler.handler(data)
}
}
}
on = (event, handler) => {
this.handlers.add({
event,
handler,
})
}
off = (event, handler) => {
this.handlers.delete({
event,
handler,
})
}
emit = (event, data) => {
if (!this.socket) {
throw new Error("Failed to send, socket not connected")
}
this.socket.send(JSON.stringify({ event, data }))
}
joinTopic = (topic) => {
this.emit("topic:join", topic)
this.joinedTopics.add(topic)
}
leaveTopic = (topic) => {
this.emit("topic:leave", topic)
this.joinedTopics.delete(topic)
}
updateState(state) {
this.stateSubscribers.forEach((callback) => callback(state))
}
//* HANDLERS
handleMessage(event) {
try {
const payload = JSON.parse(event.data)
if (typeof payload.event !== "string") {
return false
}
if (payload.event === "error") {
console.error(payload.data)
return false
}
this._emit(payload.event, payload.data)
} catch (error) {
console.error("Error handling message:", error)
}
}
// UPDATERS
onStateChange(callback) {
this.stateSubscribers.push(callback)
return () => {
this.stateSubscribers = this.stateSubscribers.filter(
(cb) => cb !== callback,
)
}
}
}
export default RTEngineClient

139
src/ws.js Normal file
View File

@ -0,0 +1,139 @@
import Remotes from "./remotes"
import Storage from "./helpers/withStorage"
import { io } from "socket.io-client"
import RTClient from "./rtclient"
class WebsocketManager {
sockets = new Map()
async connect(remote) {
let opts = {
transports: ["websocket"],
autoConnect: remote.autoConnect ?? true,
forceNew: true,
path: remote.path,
...(remote.params ?? {}),
}
if (remote.noAuth !== true) {
opts.auth = {
token: Storage.engine.get("token"),
}
}
const socket = io(Remotes.origin, opts)
socket.on("connect", () => {
globalThis.__comty_shared_state.eventBus.emit(
`wsmanager:${remote.namespace}:connected`,
)
})
socket.on("disconnect", () => {
globalThis.__comty_shared_state.eventBus.emit(
`wsmanager:${remote.namespace}:disconnected`,
)
})
socket.on("error", (error) => {
globalThis.__comty_shared_state.eventBus.emit(
`wsmanager:${remote.namespace}:error`,
error,
)
})
this.sockets.set(remote.namespace, socket)
return socket
}
async connectNg(remote) {
console.warn(
`Creating experimental socket client, some features may not work as expected:`,
remote,
)
const client = new RTClient({
url: `${Remotes.origin}/${remote.namespace}`,
token: Storage.engine.get("token"),
})
client.on("connect", () => {
globalThis.__comty_shared_state.eventBus.emit(
`wsmanager:${remote.namespace}:connected`,
)
})
client.on("disconnect", () => {
globalThis.__comty_shared_state.eventBus.emit(
`wsmanager:${remote.namespace}:disconnected`,
)
})
client.on("error", (error) => {
globalThis.__comty_shared_state.eventBus.emit(
`wsmanager:${remote.namespace}:error`,
error,
)
})
await client.connect()
this.sockets.set(remote.namespace, client)
return client
}
async disconnect(key) {
const socket = this.sockets.get(key)
if (!socket) {
return null
}
if (
socket.connected === true &&
typeof socket.disconnect === "function"
) {
await socket.disconnect()
}
if (typeof socket.removeAllListeners === "function") {
await socket.removeAllListeners()
}
this.sockets.delete(key)
}
async connectAll() {
if (this.sockets.size > 0) {
await this.disconnectAll()
}
for await (const remote of Remotes.websockets) {
try {
if (remote.ng === true) {
await this.connectNg(remote)
} else {
await this.connect(remote)
}
} catch (error) {
globalThis.__comty_shared_state.eventBus.emit(
`wsmanager:${remote.namespace}:error`,
error,
)
}
}
globalThis.__comty_shared_state.eventBus.emit("wsmanager:all:connected")
}
async disconnectAll() {
for (const [key, socket] of this.sockets) {
await this.disconnect(key)
}
}
}
export default WebsocketManager