diff --git a/packages/music_server/src/api.js b/packages/music_server/src/api.js index 5ccb1b4a..681c750e 100755 --- a/packages/music_server/src/api.js +++ b/packages/music_server/src/api.js @@ -10,7 +10,7 @@ import DbManager from "@shared-classes/DbManager" import RedisClient from "@shared-classes/RedisClient" import StorageClient from "@shared-classes/StorageClient" -import RoomServer from "./roomsServer" +import WebsocketServer from "./ws" import pkg from "../package.json" @@ -24,8 +24,7 @@ export default class Server { constructor(options = {}) { this.server = express() this._http = http.createServer(this.server) - - this.websocketServer = new RoomServer(this._http) + this.websocketServer = global.ws = new WebsocketServer(this._http) this.options = { listenHost: process.env.HTTP_LISTEN_IP ?? "0.0.0.0", diff --git a/packages/music_server/src/roomsServer.js b/packages/music_server/src/classes/Room/index.js similarity index 59% rename from packages/music_server/src/roomsServer.js rename to packages/music_server/src/classes/Room/index.js index 0e1771e0..f6b18865 100644 --- a/packages/music_server/src/roomsServer.js +++ b/packages/music_server/src/classes/Room/index.js @@ -1,45 +1,7 @@ -import socketio from "socket.io" -import { createAdapter } from "@socket.io/cluster-adapter" -import { setupWorker } from "@socket.io/sticky" +import generateFnHandler from "@utils/generateFnHandler" +import composePayloadData from "@utils/composePayloadData" -import withWsAuth from "@middlewares/withWsAuth" - -function generateFnHandler(fn, socket) { - return async (...args) => { - if (typeof socket === "undefined") { - socket = arguments[0] - } - - try { - fn(socket, ...args) - } catch (error) { - console.error(`[HANDLER_ERROR] ${error.message} >`, error.stack) - - if (typeof socket.emit !== "function") { - return false - } - - return socket.emit("error", { - message: error.message, - }) - } - } -} - -function composePayloadData(socket, data = {}) { - return { - user: { - user_id: socket.userData._id, - username: socket.userData.username, - fullName: socket.userData.fullName, - avatar: socket.userData.avatar, - }, - command_issuer: data.command_issuer ?? socket.userData._id, - ...data - } -} - -class Room { +export default class Room { constructor(io, roomId, roomOptions = { title: "Untitled Room" }) { if (!io) { throw new Error("io is required") @@ -380,211 +342,4 @@ class Room { makeOwner = (socket) => { this.ownerUserId = socket.userData._id } -} - -class RoomsController { - constructor(io) { - if (!io) { - throw new Error("io is required") - } - - this.io = io - } - - rooms = [] - - checkRoomExists = (roomId) => { - return this.rooms.some((room) => room.roomId === roomId) - } - - createRoom = async (roomId, roomOptions) => { - if (this.checkRoomExists(roomId)) { - throw new Error(`Room ${roomId} already exists`) - } - - const room = new Room(this.io, roomId, roomOptions) - - this.rooms.push(room) - - return room - } - - connectSocketToRoom = async (socket, roomId, roomOptions) => { - let room = null - - if (!this.checkRoomExists(roomId)) { - room = await this.createRoom(roomId, roomOptions) - - // make owner - room.makeOwner(socket) - } - - // check if user is already connected to a room - if (socket.connectedRoomId) { - console.warn(`[${socket.id}][@${socket.userData.username}] already connected to room ${socket.connectedRoomId}`) - - this.disconnectSocketFromRoom(socket) - } - - if (!room) { - room = this.rooms.find((room) => room.roomId === roomId) - } - - return room.join(socket) - } - - disconnectSocketFromRoom = async (socket, roomId) => { - if (!roomId) { - roomId = socket.connectedRoomId - } - - if (!this.checkRoomExists(roomId)) { - console.warn(`Cannot disconnect socket [${socket.id}][@${socket.userData.username}] from room ${roomId}, room does not exists`) - return false - } - - const room = this.rooms.find((room) => room.roomId === roomId) - - // if owners leaves, rotate owner to the next user - if (socket.userData._id === room.ownerUserId) { - if (room.connections.length > 0 && room.connections[1]) { - room.transferOwner(room.connections[1]) - } - } - - // leave - room.leave(socket) - - // if room is empty, destroy it - if (room.connections.length === 0) { - await this.destroyRoom(roomId) - - return true - } - - return true - } - - destroyRoom = async (roomId) => { - if (!this.checkRoomExists(roomId)) { - throw new Error(`Room ${roomId} does not exists`) - } - - const room = this.rooms.find((room) => room.roomId === roomId) - - room.destroy() - - this.rooms.splice(this.rooms.indexOf(room), 1) - - return true - } -} - -export default class RoomsServer { - constructor(server) { - this.io = socketio(server, { - cors: { - origin: "*", - methods: ["GET", "POST"], - credentials: true, - } - }) - - if (global.ioAdapter) { - this.io.adapter(global.ioAdapter) - } - - this.RoomsController = new RoomsController(this.io) - } - - connectionPool = [] - - events = { - - } - - inviteUserToRoom = async (socket, data) => { - try { - // find sockets with matching user_id - const invitedSockets = this.connectionPool.filter((client) => client.userData._id === data.user_id) - - if (invitedSockets.length === 0) { - console.warn(`[${socket.id}][@${socket.userData.username}] cannot invite user ${data.user_id}, user not found in connection pool`) - return socket.emit("error", { - message: `User ${data.user_id} not found`, - }) - } - - for (const invitedSocket of invitedSockets) { - // check if user is already connected to the room - if (invitedSocket.connectedRoomId === data.roomId) { - console.warn(`[${socket.id}][@${socket.userData.username}] cannot invite user ${data.user_id}, user already connected to room ${data.roomId}`) - return false - } - - console.log(`[${socket.id}][@${socket.userData.username}] inviting user ${data.user_id} to room ${data.roomId}`) - - invitedSocket.emit("invite:received", { - roomId: data.roomId, - invitedBy: { - _id: socket.userData._id, - username: socket.userData.username, - fullName: socket.userData.fullName, - avatar: socket.userData.avatar, - }, - }) - } - } catch (error) { - return socket.emit("error", { - message: error.message, - }) - } - } - - initialize = async () => { - this.io.use(withWsAuth) - - this.io.on("connection", (socket) => { - try { - console.log(`[${socket.id}][${socket.userData.username}] connected to hub.`) - - this.connectionPool.push(socket) - - socket.on("disconnect", () => this.events.disconnect) - - // Rooms - socket.on("join:room", (data) => this.RoomsController.connectSocketToRoom(socket, data.room, data.options)) - socket.on("leave:room", (data) => this.RoomsController.disconnectSocketFromRoom(socket, data?.room ?? socket.connectedRoomId, data?.options ?? {})) - socket.on("invite:user", generateFnHandler(this.inviteUserToRoom, socket)) - - socket.on("ping", (callback) => { - callback() - }) - - socket.on("disconnect", (_socket) => { - console.log(`[${socket.id}][@${socket.userData.username}] disconnected to hub.`) - - if (socket.connectedRoomId) { - console.log(`[${socket.id}][@${socket.userData.username}] was connected to room [${socket.connectedRoomId}], leaving...`) - this.RoomsController.disconnectSocketFromRoom(socket) - } - - // remove from connection pool - this.connectionPool = this.connectionPool.filter((client) => client.id !== socket.id) - }) - - Object.entries(this.events).forEach(([event, handler]) => { - socket.on(event, (data) => { - try { - handler(socket, data) - } catch (error) { - console.error(error) - } - }) - }) - } catch (error) { - console.error(error) - } - }) - } -} +} \ No newline at end of file diff --git a/packages/music_server/src/classes/RoomsController/index.js b/packages/music_server/src/classes/RoomsController/index.js new file mode 100644 index 00000000..42885da6 --- /dev/null +++ b/packages/music_server/src/classes/RoomsController/index.js @@ -0,0 +1,99 @@ +import Room from "@classes/Room" + +export default class RoomsController { + constructor(io) { + if (!io) { + throw new Error("io is required") + } + + this.io = io + } + + rooms = [] + + checkRoomExists = (roomId) => { + return this.rooms.some((room) => room.roomId === roomId) + } + + createRoom = async (roomId, roomOptions) => { + if (this.checkRoomExists(roomId)) { + throw new Error(`Room ${roomId} already exists`) + } + + const room = new Room(this.io, roomId, roomOptions) + + this.rooms.push(room) + + return room + } + + connectSocketToRoom = async (socket, roomId, roomOptions) => { + let room = null + + if (!this.checkRoomExists(roomId)) { + room = await this.createRoom(roomId, roomOptions) + + // make owner + room.makeOwner(socket) + } + + // check if user is already connected to a room + if (socket.connectedRoomId) { + console.warn(`[${socket.id}][@${socket.userData.username}] already connected to room ${socket.connectedRoomId}`) + + this.disconnectSocketFromRoom(socket) + } + + if (!room) { + room = this.rooms.find((room) => room.roomId === roomId) + } + + return room.join(socket) + } + + disconnectSocketFromRoom = async (socket, roomId) => { + if (!roomId) { + roomId = socket.connectedRoomId + } + + if (!this.checkRoomExists(roomId)) { + console.warn(`Cannot disconnect socket [${socket.id}][@${socket.userData.username}] from room ${roomId}, room does not exists`) + return false + } + + const room = this.rooms.find((room) => room.roomId === roomId) + + // if owners leaves, rotate owner to the next user + if (socket.userData._id === room.ownerUserId) { + if (room.connections.length > 0 && room.connections[1]) { + room.transferOwner(room.connections[1]) + } + } + + // leave + room.leave(socket) + + // if room is empty, destroy it + if (room.connections.length === 0) { + await this.destroyRoom(roomId) + + return true + } + + return true + } + + destroyRoom = async (roomId) => { + if (!this.checkRoomExists(roomId)) { + throw new Error(`Room ${roomId} does not exists`) + } + + const room = this.rooms.find((room) => room.roomId === roomId) + + room.destroy() + + this.rooms.splice(this.rooms.indexOf(room), 1) + + return true + } +} diff --git a/packages/music_server/src/index.js b/packages/music_server/src/index.js index 57895a24..cb83e8f8 100755 --- a/packages/music_server/src/index.js +++ b/packages/music_server/src/index.js @@ -1,4 +1,3 @@ - import { webcrypto as crypto } from "crypto" import path from "path" import { registerBaseAliases } from "linebridge/dist/server" @@ -61,18 +60,31 @@ import API from "./api" async function main() { if (process.env.INFISICAL_TOKEN) { + console.log("🔑 Fetching secrets from Infisical...") + const client = new infisical({ token: process.env.INFISICAL_TOKEN, }) const secrets = await client.getAllSecrets() + console.log("🔑 Injecting secrets to process.env...",) + // inject to process.env secrets.forEach((secret) => { - process.env[secret.secretName] = secret.secretValue + if (!(secret.secretName in process.env)) { + process.env[secret.secretName] = secret.secretValue + } }) } + // transform "undefined" or "null" envs to undefined + Object.keys(process.env).forEach((key) => { + if (process.env[key] === "undefined" || process.env[key] === "null") { + process.env[key] = undefined + } + }) + const instance = new API() await instance.initialize() diff --git a/packages/music_server/src/utils/composePayloadData/index.js b/packages/music_server/src/utils/composePayloadData/index.js new file mode 100644 index 00000000..de5761f5 --- /dev/null +++ b/packages/music_server/src/utils/composePayloadData/index.js @@ -0,0 +1,12 @@ +export default function composePayloadData(socket, data = {}) { + return { + user: { + user_id: socket.userData._id, + username: socket.userData.username, + fullName: socket.userData.fullName, + avatar: socket.userData.avatar, + }, + command_issuer: data.command_issuer ?? socket.userData._id, + ...data + } +} \ No newline at end of file diff --git a/packages/music_server/src/utils/generateFnHandler/index.js b/packages/music_server/src/utils/generateFnHandler/index.js new file mode 100644 index 00000000..4c5962d5 --- /dev/null +++ b/packages/music_server/src/utils/generateFnHandler/index.js @@ -0,0 +1,21 @@ +export default function generateFnHandler(fn, socket) { + return async (...args) => { + if (typeof socket === "undefined") { + socket = arguments[0] + } + + try { + fn(socket, ...args) + } catch (error) { + console.error(`[HANDLER_ERROR] ${error.message} >`, error.stack) + + if (typeof socket.emit !== "function") { + return false + } + + return socket.emit("error", { + message: error.message, + }) + } + } +} \ No newline at end of file diff --git a/packages/music_server/src/ws.js b/packages/music_server/src/ws.js new file mode 100644 index 00000000..63aed8ae --- /dev/null +++ b/packages/music_server/src/ws.js @@ -0,0 +1,117 @@ +import socketio from "socket.io" +import generateFnHandler from "@utils/generateFnHandler" + +import withWsAuth from "@middlewares/withWsAuth" + +import RoomsController from "@classes/RoomsController" + +export default class WebsocketServer { + constructor(server) { + this.io = socketio(server, { + cors: { + origin: "*", + methods: ["GET", "POST"], + credentials: true, + } + }) + + if (global.ioAdapter) { + this.io.adapter(global.ioAdapter) + } + + this.RoomsController = new RoomsController(this.io) + + return this + } + + connectionPool = [] + + events = { + + } + + inviteUserToRoom = async (socket, data) => { + try { + // find sockets with matching user_id + const invitedSockets = this.connectionPool.filter((client) => client.userData._id === data.user_id) + + if (invitedSockets.length === 0) { + console.warn(`[${socket.id}][@${socket.userData.username}] cannot invite user ${data.user_id}, user not found in connection pool`) + return socket.emit("error", { + message: `User ${data.user_id} not found`, + }) + } + + for (const invitedSocket of invitedSockets) { + // check if user is already connected to the room + if (invitedSocket.connectedRoomId === data.roomId) { + console.warn(`[${socket.id}][@${socket.userData.username}] cannot invite user ${data.user_id}, user already connected to room ${data.roomId}`) + return false + } + + console.log(`[${socket.id}][@${socket.userData.username}] inviting user ${data.user_id} to room ${data.roomId}`) + + invitedSocket.emit("invite:received", { + roomId: data.roomId, + invitedBy: { + _id: socket.userData._id, + username: socket.userData.username, + fullName: socket.userData.fullName, + avatar: socket.userData.avatar, + }, + }) + } + } catch (error) { + return socket.emit("error", { + message: error.message, + }) + } + } + + initialize = async () => { + this.io.use(withWsAuth) + + this.io.on("connection", (socket) => { + try { + console.log(`[${socket.id}][${socket.userData.username}] connected to hub.`) + + this.connectionPool.push(socket) + + socket.on("disconnect", () => this.events.disconnect) + + // Rooms + socket.on("join:room", (data) => this.RoomsController.connectSocketToRoom(socket, data.room, data.options)) + socket.on("leave:room", (data) => this.RoomsController.disconnectSocketFromRoom(socket, data?.room ?? socket.connectedRoomId, data?.options ?? {})) + socket.on("invite:user", generateFnHandler(this.inviteUserToRoom, socket)) + + socket.on("ping", (callback) => { + callback() + }) + + socket.on("disconnect", (_socket) => { + console.log(`[${socket.id}][@${socket.userData.username}] disconnected to hub.`) + + if (socket.connectedRoomId) { + console.log(`[${socket.id}][@${socket.userData.username}] was connected to room [${socket.connectedRoomId}], leaving...`) + this.RoomsController.disconnectSocketFromRoom(socket) + } + + // remove from connection pool + this.connectionPool = this.connectionPool.filter((client) => client.id !== socket.id) + }) + + Object.entries(this.events).forEach(([event, handler]) => { + socket.on(event, (data) => { + try { + handler(socket, data) + } catch (error) { + console.error(error) + } + }) + }) + } catch (error) { + console.error(error) + } + }) + } +}