abstract client

This commit is contained in:
SrGooglo 2025-03-25 22:46:25 +00:00
parent fe1961e793
commit fcb4672624
3 changed files with 110 additions and 51 deletions

View File

@ -0,0 +1,47 @@
class Client {
constructor(socket) {
this.socket = socket
this.id = socket.context.id
this.userId = socket.context.user?._id || null
this.authed = !!socket.context.session
}
emit(event, data) {
const payload = JSON.stringify({ event, data })
return this.socket.send(payload)
}
toTopic(topic, event, data, self = false) {
const payload = JSON.stringify({
topic,
event,
data,
})
this.socket.publish(topic, payload)
if (self === true) {
this.emit(event, data)
}
}
error(error) {
if (error instanceof Error) {
error = error.toString()
}
return this.emit("error", error)
}
subscribe(topic) {
return this.socket.subscribe(topic)
}
unsubscribe(topic) {
return this.socket.unsubscribe(topic)
}
}
export default Client

View File

@ -0,0 +1,8 @@
export default {
"topic:join": async (client, topic) => {
client.subscribe(topic)
},
"topic:leave": async (client, topic) => {
client.unsubscribe(topic)
},
}

View File

@ -1,5 +1,8 @@
import HyperExpress from "hyper-express" import HyperExpress from "hyper-express"
import Client from "./client"
import BuiltInEvents from "./events"
class RTEngineNG { class RTEngineNG {
constructor(config = {}) { constructor(config = {}) {
this.events = new Map() this.events = new Map()
@ -10,85 +13,84 @@ class RTEngineNG {
} }
} }
this.onUpgrade = config.onUpgrade || null for (const [event, handler] of Object.entries(BuiltInEvents)) {
this.onConnection = config.onConnection || null this.events.set(event, handler)
this.onDisconnection = config.onDisconnection || null
} }
clients = new Set() this.onUpgrade = config.onUpgrade || null
this.onConnection = config.onConnection || null
this.onDisconnect = config.onDisconnect || null
}
clients = new Map()
router = new HyperExpress.Router() router = new HyperExpress.Router()
senders = { senders = {
broadcast: async (event, data) => { broadcast: async (event, data) => {
for (const client of this.clients) { for (const [socketId, client] of this.clients) {
this.sendMessage(client, event, data) client.emit(event, data)
} }
}, },
toTopic: async (topic, event, data) => {
if (!this.engine) {
throw new Error("Engine not initialized")
}
return this.engine.app.publish(
topic,
JSON.stringify({
topic: topic,
event: event,
data: data,
}),
)
},
} }
sendMessage = (socket, event, data) => { find = {
const payload = JSON.stringify({ event, data }) clientsByUserId: (userId) => {
const clients = []
socket.send(payload) for (const [socketId, client] of this.clients) {
} if (client.userId === userId) {
clients.push(client)
sendToTopic = (socket, topic, event, data, self = false) => {
const payload = JSON.stringify({
topic,
event,
data,
})
socket.publish(topic, payload)
if (self === true) {
this.sendMessage(socket, event, data)
} }
} }
sendError = (socket, error) => { return clients
if (error instanceof Error) { },
error = error.toString()
}
this.sendMessage(socket, "error", error)
} }
handleMessage = async (socket, payload) => { handleMessage = async (socket, payload) => {
const client = this.clients.get(socket.context.id)
if (!client) {
return socket.send(
JSON.stringify({ event: "error", data: "Client not found" }),
)
}
let message = null let message = null
try { try {
message = JSON.parse(payload) message = JSON.parse(payload)
if (typeof message.event !== "string") { if (typeof message.event !== "string") {
return this.sendError(socket, "Invalid event type") return client.error("Invalid event type")
} }
const handler = this.events.get(message.event) const handler = this.events.get(message.event)
if (typeof handler === "function") { if (typeof handler === "function") {
const handlerSenders = { await handler(client, message.data)
...this.senders,
toTopic: (room, event, data, self) => {
this.sendToTopic(socket, room, event, data, self)
},
send: (event, data) => {
this.sendMessage(socket, event, data)
},
error: (error) => {
this.sendError(socket, error)
},
}
await handler(socket, message.data, handlerSenders)
} else { } else {
console.log(`[ws] 404 /${message.event}`) console.log(`[ws] 404 /${message.event}`)
this.sendError(socket, "Event handler not found") client.error("Event handler not found")
} }
} catch (error) { } catch (error) {
console.log(`[ws] 500 /${message?.event ?? "unknown"} >`, error) console.log(`[ws] 500 /${message?.event ?? "unknown"} >`, error)
this.sendError(socket, error) client.error(error)
} }
} }
@ -98,17 +100,19 @@ class RTEngineNG {
} }
socket.on("message", (payload) => this.handleMessage(socket, payload)) socket.on("message", (payload) => this.handleMessage(socket, payload))
socket.on("close", () => this.handleDisconnection(socket)) socket.on("close", () => this.handleDisconnect(socket))
this.clients.add(socket) const client = new Client(socket)
this.clients.set(socket.context.id, client)
} }
handleDisconnection = async (socket) => { handleDisconnect = async (socket) => {
if (this.onDisconnection) { if (typeof this.onDisconnect === "function") {
await this.onDisconnection(socket) await this.onDisconnect(socket)
} }
this.clients.delete(socket) this.clients.delete(socket.context.id)
} }
handleUpgrade = async (req, res) => { handleUpgrade = async (req, res) => {