implement music_server server

This commit is contained in:
SrGooglo 2023-05-24 17:41:09 +00:00
parent 55a78a9b70
commit 318a62fe35
19 changed files with 807 additions and 424 deletions

View File

@ -1,5 +1,5 @@
{
"name": "@comty/music_spaces_server",
"name": "@comty/music_server",
"version": "0.46.1",
"main": "dist/index.js",
"scripts": {
@ -9,8 +9,9 @@
"license": "MIT",
"dependencies": {
"@foxify/events": "^2.1.0",
"axios": "^1.2.1",
"axios": "^1.4.0",
"bcrypt": "5.0.1",
"comty.js": "^0.46.1",
"corenode": "0.28.26",
"cors": "^2.8.5",
"dotenv": "^16.0.3",
@ -21,11 +22,10 @@
"moment-timezone": "0.5.37",
"morgan": "^1.10.0",
"nanoid": "3.2.0",
"socket.io": "^4.5.4",
"spotify-ws": "^0.1.1"
"socket.io": "^4.5.4"
},
"devDependencies": {
"cross-env": "^7.0.3",
"nodemon": "^2.0.15"
}
}
}

264
packages/music_server/src/api.js Normal file → Executable file
View File

@ -1,235 +1,48 @@
import fs from "fs"
import path from "path"
import express from "express"
import http from "http"
import cors from "cors"
import morgan from "morgan"
import socketio from "socket.io"
import EventEmitter from "@foxify/events"
import jwt from "jsonwebtoken"
import axios from "axios"
import ComtyClient from "@classes/ComtyClient"
import routes from "./routes"
const mainAPI = axios.create({
baseURL: process.env.MAIN_API_URL ?? "http://localhost:3010",
headers: {
"server_token": `${process.env.MAIN_SERVER_ID}:${process.env.MAIN_SERVER_TOKEN}`,
}
})
class SpotifyRoom {
constructor(options = {}) {
this.options = {
...options,
}
this.owner_user_id = options.owner_user_id
this.roomId = options.roomId
}
owner = null
listeners = []
appendListener = (listener) => {
}
initialize = async () => {
}
}
class RealtimeRoomEventServer {
constructor(server, options = {}) {
this.io = socketio(server, {
cors: {
origin: "*",
methods: ["GET", "POST"],
credentials: true,
}
})
this.limitations = {
...options.limitations,
}
}
connectionPool = []
roomEventsHandlers = {
"owner:update": (socket, payload) => {
}
}
initializeSocketIO = () => {
this.io.use(async (socket, next) => {
try {
const token = socket.handshake.auth.token
if (!token) {
return next(new Error(`auth:token_missing`))
}
const session = await mainAPI.post("/session/validate", {
session: token
})
.then((res) => {
return res.data
})
.catch((err) => {
console.log(err.response.data)
return false
})
if (!session || !session?.valid) {
return next(new Error(`auth:token_invalid`))
}
const userData = await mainAPI.get(`/user/${session.user_id}/data`)
.then((res) => {
return res.data
})
.catch((err) => {
console.log(err)
return null
})
if (!userData) {
return next(new Error(`auth:user_failed`))
}
try {
// try to decode the token and get the user's username
const decodedToken = jwt.decode(token)
socket.userData = userData
socket.token = token
socket.decodedToken = decodedToken
}
catch (err) {
return next(new Error(`auth:decode_failed`))
}
console.log(`[${socket.id}] connected`)
next()
} catch (error) {
next(new Error(`auth:authentification_failed`))
}
})
this.io.on("connection", (socket) => {
socket.on("join", (...args) => this.handleClientJoin(socket, ...args))
socket.on("disconnect", () => {
this.handleClientDisconnect(socket)
})
})
}
async handleClientJoin(socket, payload, cb) {
const { room, type } = payload
if (!room) {
return cb(new Error(`room:invalid`))
}
socket.connectedRoom = room
const pool = await this.attachClientToPool(socket, room).catch((err) => {
cb(err)
return null
})
if (!pool) return
console.log(`[${socket.id}] joined room [${room}]`)
socket.join(room)
Object.keys(this.roomEventsHandlers).forEach((event) => {
socket.on(event, (...args) => this.roomEventsHandlers[event](socket, ...args))
})
// start spotify ws connection
const roomConnections = this.connectionPool.filter((client) => client.room === room).length
return cb(null, {
roomConnections,
limitations: this.limitations,
})
}
handleClientDisconnect(socket) {
const index = this.connectionPool.findIndex((client) => client.id === socket.id)
if (index === -1) return
return this.connectionPool.splice(index, 1)
}
async attachClientToPool(socket, room) {
// TODO: check if user can join room or is privated
if (!room) {
throw new Error(`room:invalid`)
}
return this.connectionPool.push({
id: socket.id,
room,
socket
})
}
}
import RoomServer from "./roomsServer"
export default class Server {
constructor(options = {}) {
this.app = express()
this.httpServer = http.createServer(this.app)
this.roomServer = new RealtimeRoomEventServer(this.httpServer)
this.websocketServer = new RoomServer(this.httpServer)
this.options = {
listenPort: process.env.PORT || 3030,
listenHost: process.env.LISTEN_HOST || "0.0.0.0",
listenPort: process.env.LISTEN_PORT || 3050,
...options
}
}
comty = global.comty = ComtyClient()
eventBus = global.eventBus = new EventEmitter()
initialize = async () => {
this.app.use(cors())
this.app.use(express.json({ extended: false }))
this.app.use(express.urlencoded({ extended: true }))
async __registerInternalMiddlewares() {
let middlewaresPath = fs.readdirSync(path.resolve(__dirname, "useMiddlewares"))
// Use logger if not in production
if (!process.env.NODE_ENV === "production") {
this.app.use(morgan("dev"))
for await (const middlewarePath of middlewaresPath) {
const middleware = require(path.resolve(__dirname, "useMiddlewares", middlewarePath)).default
if (!middleware) {
console.error(`Middleware ${middlewarePath} not found.`)
continue
}
this.app.use(middleware)
}
await this.roomServer.initializeSocketIO()
await this.registerBaseRoute()
await this.registerRoutes()
await this.httpServer.listen(this.options.listenPort)
return {
listenPort: this.options.listenPort,
}
}
async registerBaseRoute() {
await this.app.get("/", async (req, res) => {
return res.json({
uptimeMinutes: Math.floor(process.uptime() / 60),
})
})
}
registerRoutes() {
@ -247,4 +60,35 @@ export default class Server {
this.app.use(route.use, ...order)
})
}
async registerBaseRoute() {
await this.app.get("/", async (req, res) => {
return res.json({
uptimeMinutes: Math.floor(process.uptime() / 60),
})
})
}
initialize = async () => {
const startHrTime = process.hrtime()
await this.websocketServer.initialize()
await this.__registerInternalMiddlewares()
this.app.use(express.json({ extended: false }))
this.app.use(express.urlencoded({ extended: true }))
await this.registerBaseRoute()
await this.registerRoutes()
await this.httpServer.listen(this.options.listenPort, this.options.listenHost)
// calculate elapsed time
const elapsedHrTime = process.hrtime(startHrTime)
const elapsedTimeInMs = elapsedHrTime[0] * 1000 + elapsedHrTime[1] / 1e6
// log server started
console.log(`🚀 Server started ready on \n\t - http://${this.options.listenHost}:${this.options.listenPort} \n\t - Tooks ${elapsedTimeInMs}ms`)
}
}

View File

@ -0,0 +1,9 @@
import createClient from "comty.js"
export default (params = {}) => {
return createClient({
...params,
accessKey: process.env.COMTY_ACCESS_KEY,
privateKey: process.env.COMTY_PRIVATE_KEY,
})
}

View File

@ -0,0 +1,58 @@
import mongoose from "mongoose"
function getConnectionConfig(obj) {
const { DB_USER, DB_DRIVER, DB_NAME, DB_PWD, DB_HOSTNAME, DB_PORT } = obj
let auth = [
DB_DRIVER ?? "mongodb",
"://",
]
if (DB_USER && DB_PWD) {
auth.push(`${DB_USER}:${DB_PWD}@`)
}
auth.push(DB_HOSTNAME ?? "localhost")
auth.push(`:${DB_PORT ?? "27017"}`)
if (DB_USER) {
auth.push("/?authMechanism=DEFAULT")
}
auth = auth.join("")
return [
auth,
{
dbName: DB_NAME,
useNewUrlParser: true,
useUnifiedTopology: true,
}
]
}
export default class DBManager {
initialize = async (config) => {
console.log("🔌 Connecting to DB...")
const dbConfig = getConnectionConfig(config ?? process.env)
mongoose.set("strictQuery", false)
const connection = await mongoose.connect(...dbConfig)
.catch((err) => {
console.log(`❌ Failed to connect to DB, retrying...\n`)
console.log(error)
// setTimeout(() => {
// this.initialize()
// }, 1000)
return false
})
if (connection) {
console.log(`✅ Connected to DB.`)
}
}
}

View File

@ -0,0 +1,44 @@
import { createClient } from "redis"
function composeURL() {
// support for auth
let url = "redis://"
if (process.env.REDIS_PASSWORD && process.env.REDIS_USERNAME) {
url += process.env.REDIS_USERNAME + ":" + process.env.REDIS_PASSWORD + "@"
}
url += process.env.REDIS_HOST ?? "localhost"
if (process.env.REDIS_PORT) {
url += ":" + process.env.REDIS_PORT
}
return url
}
export default () => {
let client = createClient({
url: composeURL(),
})
client.initialize = async () => {
console.log("🔌 Connecting to Redis client...")
await client.connect()
return client
}
// handle when client disconnects unexpectedly to avoid main crash
client.on("error", (error) => {
console.error("❌ Redis client error:", error)
})
// handle when client connects
client.on("connect", () => {
console.log("✅ Redis client connected.")
})
return client
}

View File

@ -0,0 +1,97 @@
const Minio = require("minio")
import path from "path"
export const generateDefaultBucketPolicy = (payload) => {
const { bucketName } = payload
if (!bucketName) {
throw new Error("bucketName is required")
}
return {
Version: "2012-10-17",
Statement: [
{
Action: [
"s3:GetObject"
],
Effect: "Allow",
Principal: {
AWS: [
"*"
]
},
Resource: [
`arn:aws:s3:::${bucketName}/*`
],
Sid: ""
}
]
}
}
export class StorageClient extends Minio.Client {
constructor(options) {
super(options)
this.defaultBucket = String(options.defaultBucket)
this.defaultRegion = String(options.defaultRegion)
}
composeRemoteURL = (key) => {
const _path = path.join(this.defaultBucket, key)
return `${this.protocol}//${this.host}:${this.port}/${_path}`
}
setDefaultBucketPolicy = async (bucketName) => {
const policy = generateDefaultBucketPolicy({ bucketName })
return this.setBucketPolicy(bucketName, JSON.stringify(policy))
}
initialize = async () => {
console.log("🔌 Checking if storage client have default bucket...")
// check connection with s3
const bucketExists = await this.bucketExists(this.defaultBucket).catch(() => {
return false
})
if (!bucketExists) {
console.warn("🪣 Default bucket not exists! Creating new bucket...")
await this.makeBucket(this.defaultBucket, "s3")
// set default bucket policy
await this.setDefaultBucketPolicy(this.defaultBucket)
}
// check if default bucket policy exists
const bucketPolicy = await this.getBucketPolicy(this.defaultBucket).catch(() => {
return null
})
if (!bucketPolicy) {
// set default bucket policy
await this.setDefaultBucketPolicy(this.defaultBucket)
}
console.log("✅ Storage client is ready.")
}
}
export const createStorageClientInstance = (options) => {
return new StorageClient({
...options,
endPoint: process.env.S3_ENDPOINT,
port: Number(process.env.S3_PORT),
useSSL: toBoolean(process.env.S3_USE_SSL),
accessKey: process.env.S3_ACCESS_KEY,
secretKey: process.env.S3_SECRET_KEY,
defaultBucket: process.env.S3_BUCKET,
defaultRegion: process.env.S3_REGION,
})
}
export default createStorageClientInstance

25
packages/music_server/src/index.js Normal file → Executable file
View File

@ -1,5 +1,24 @@
require("dotenv").config()
if (typeof process.env.NODE_ENV === "undefined") {
process.env.NODE_ENV = "development"
}
global.isProduction = process.env.NODE_ENV === "production"
import path from "path"
import { registerBaseAliases } from "linebridge/dist/server"
const customAliases = {
"@services": path.resolve(__dirname, "services"),
}
if (!global.isProduction) {
customAliases["comty.js"] = path.resolve(__dirname, "../../comty.js/src")
}
registerBaseAliases(undefined, customAliases)
// patches
const { Buffer } = require("buffer")
@ -32,16 +51,12 @@ global.toBoolean = (value) => {
return false
}
import pkg from "../package.json"
import API from "./api"
async function main() {
const api = new API()
console.log(`\n▶️ Initializing ${pkg.name} ...\n`)
const init = await api.initialize()
console.log(`\n🚀 ${pkg.name} v${pkg.version} is running on port ${init.listenPort}.\n`)
await api.initialize()
}
main().catch((error) => {

View File

@ -1,5 +0,0 @@
export const errorHandler = (error, req, res, next) => {
res.json({ error: error.message })
}
export default errorHandler

View File

@ -1,30 +0,0 @@
export const hasPermissions = (req, res, next) => {
if (typeof (req.userData) == "undefined") {
return res.status(403).json(`User data is not available, please ensure if you are authenticated`)
}
const { _id, username, roles } = req.userData
const { permissions } = req.body
req.userPermissions = roles
let check = []
if (Array.isArray(permissions)) {
check = permissions
} else {
check.push(permissions)
}
if (check.length > 0) {
check.forEach((role) => {
if (!roles.includes(role)) {
return res.status(403).json(`${username} not have permissions ${permissions}`)
}
})
}
next()
}
export default hasPermissions

View File

@ -1,12 +0,0 @@
// const fileUpload = require("@nanoexpress/middleware-file-upload/cjs")()
export { default as withAuthentication } from "./withAuthentication"
export { default as withOptionalAuthentication } from "./withOptionalAuthentication"
export { default as errorHandler } from "./errorHandler"
export { default as hasPermissions } from "./hasPermissions"
export { default as roles } from "./roles"
export { default as onlyAdmin } from "./onlyAdmin"
export { default as permissions } from "./permissions"
// export { fileUpload as fileUpload }

View File

@ -1,7 +0,0 @@
export default (req, res, next) => {
if (!req.user.roles.includes("admin")) {
return res.status(403).json({ error: "To make this request it is necessary to have administrator permissions" })
}
next()
}

View File

@ -1,39 +0,0 @@
import { Config } from "../../models"
export default (req, res, next) => {
const requestedPath = `${req.method.toLowerCase()}${req.path.toLowerCase()}`
Config.findOne({ key: "permissions" }, undefined, {
lean: true,
}).then(({ value }) => {
req.assertedPermissions = []
const pathRoles = value.pathRoles ?? {}
if (typeof pathRoles[requestedPath] === "undefined") {
console.warn(`[Permissions] No permissions defined for path ${requestedPath}`)
return next()
}
const requiredRoles = Array.isArray(pathRoles[requestedPath]) ? pathRoles[requestedPath] : [pathRoles[requestedPath]]
requiredRoles.forEach((role) => {
if (req.user.roles.includes(role)) {
req.assertedPermissions.push(role)
}
})
if (req.user.roles.includes("admin")) {
req.assertedPermissions.push("admin")
}
if (req.assertedPermissions.length === 0 && !req.user.roles.includes("admin")) {
return res.status(403).json({
error: "forbidden",
message: "You don't have permission to access this resource",
})
}
next()
})
}

View File

@ -1,19 +0,0 @@
export default (req, res, next) => {
req.isAdmin = () => {
if (req.user.roles.includes("admin")) {
return true
}
return false
}
req.hasRole = (role) => {
if (req.user.roles.includes(role)) {
return true
}
return false
}
next()
}

View File

@ -1,83 +0,0 @@
import { Session, User } from "../../models"
import { Token } from "../../lib"
import jwt from "jsonwebtoken"
export default (req, res, next) => {
function reject(description) {
return res.status(403).json({ error: `${description ?? "Invalid session"}` })
}
const authHeader = req.headers?.authorization?.split(" ")
if (authHeader && authHeader[0] === "Bearer") {
const token = authHeader[1]
let decoded = null
try {
decoded = jwt.decode(token)
} catch (error) {
console.error(error)
}
if (!decoded) {
return reject("Cannot decode token")
}
jwt.verify(token, global.jwtStrategy.secretOrKey, async (err) => {
const sessions = await Session.find({ user_id: decoded.user_id })
const currentSession = sessions.find((session) => session.token === token)
if (!currentSession) {
return reject("Cannot find session")
}
const userData = await User.findOne({ _id: currentSession.user_id }).select("+refreshToken")
if (!userData) {
return res.status(404).json({ error: "No user data found" })
}
// if cannot verify token, start regeneration process
if (err) {
// first check if token is only expired, if is corrupted, reject
if (err.name !== "TokenExpiredError") {
return reject("Invalid token, cannot regenerate")
}
let regenerationToken = null
// check if this expired token has a regeneration token associated
const associatedRegenerationToken = await Token.getRegenerationToken(token)
if (associatedRegenerationToken) {
regenerationToken = associatedRegenerationToken.refreshToken
} else {
// create a new regeneration token with the expired token
regenerationToken = await Token.createNewRegenerationToken(token).catch((error) => {
// in case of error, reject
reject(error.message)
return null
})
}
if (!regenerationToken) return
// now send the regeneration token to the client (start Expired Exception Event [EEE])
return res.status(401).json({
error: "Token expired",
refreshToken: regenerationToken.refreshToken,
})
}
req.user = userData
req.jwtToken = token
req.decodedToken = decoded
req.currentSession = currentSession
return next()
})
} else {
return reject("Missing token header")
}
}

View File

@ -1,9 +0,0 @@
import withAuthentication from "../withAuthentication"
export default (req, res, next) => {
if (req.headers?.authorization) {
withAuthentication(req, res, next)
} else {
next()
}
}

View File

@ -0,0 +1,55 @@
export default async (socket, next) => {
try {
const token = socket.handshake.auth.token
if (!token) {
return next(new Error(`auth:token_missing`))
}
const validation = await global.comty.rest.session.validateToken(token).catch((err) => {
console.error(`[${socket.id}] failed to validate session caused by server error`, err)
return {
valid: false,
error: err,
}
})
if (!validation.valid) {
if (validation.error) {
return next(new Error(`auth:server_error`))
}
return next(new Error(`auth:token_invalid`))
}
const session = validation.session
const userData = await global.comty.rest.user.data({
user_id: session.user_id,
}).catch((err) => {
console.error(`[${socket.id}] failed to get user data caused by server error`, err)
return null
})
if (!userData) {
return next(new Error(`auth:user_failed`))
}
try {
socket.userData = userData
socket.token = token
socket.session = session
}
catch (err) {
return next(new Error(`auth:decode_failed`))
}
next()
} catch (error) {
console.error(`[${socket.id}] failed to connect caused by server error`, error)
next(new Error(`auth:authentification_failed`))
}
}

View File

@ -0,0 +1,443 @@
import socketio from "socket.io"
import withWsAuth from "@middlewares/withWsAuth"
function generateFnHandler(fn, socket) {
return async (...args) => {
if (typeof socket === "undefined") {
socket = arguments[0]
}
try {
fn(socket, ...args)
} catch (error) {
console.error(`[HANDLER_ERROR] ${error.message} >`, error.stack)
if (typeof socket.emit !== "function") {
return false
}
return socket.emit("error", {
message: error.message,
})
}
}
}
function composePayloadData(socket, data) {
return {
selfUser: {
user_id: socket.userData._id,
username: socket.userData.username,
fullName: socket.userData.fullName,
avatar: socket.userData.avatar,
},
...data
}
}
class Room {
constructor(io, roomId, roomOptions = { title: "Untitled Room" }) {
if (!io) {
throw new Error("io is required")
}
this.io = io
this.roomId = roomId
this.roomOptions = roomOptions
}
ownerUserId = null
connections = []
limitations = {
maxConnections: 10,
}
currentState = null
events = {
"music:player:start": (socket, data) => {
// dispached when someone start playing a new track
// if not owner, do nothing
if (socket.userData._id !== this.ownerUserId) {
return false
}
this.io.to(this.roomId).emit("music:player:start", composePayloadData(socket, data))
},
"music:player:seek": (socket, data) => {
// dispached when someone seek the track
// if not owner, do nothing
if (socket.userData._id !== this.ownerUserId) {
return false
}
this.io.to(this.roomId).emit("music:player:seek", composePayloadData(socket, data))
},
"music:player:loading": () => {
// TODO: Softmode and Hardmode
// sync with current state, seek if needed (if is not owner)
},
"music:player:status": (socket, data) => {
if (socket.userData._id !== this.ownerUserId) {
return false
}
this.io.to(this.roomId).emit("music:player:status", composePayloadData(socket, data))
},
"music:owner:state_update": (socket, data) => {
if (socket.userData._id !== this.ownerUserId) {
return false
}
this.currentState = data
}
}
join = (socket) => {
// set connected room name
socket.connectedRoomId = this.roomId
// join room
socket.join(this.roomId)
// add to connections
this.connections.push(socket)
// emit to self
socket.emit("room:joined", this.composeRoomData())
// emit to others
this.io.to(this.roomId).emit("room:user:joined", {
user: {
user_id: socket.userData._id,
username: socket.userData.username,
fullName: socket.userData.fullName,
avatar: socket.userData.avatar,
}
})
// register events
for (const [event, fn] of Object.entries(this.events)) {
const handler = generateFnHandler(fn, socket)
if (!Array.isArray(socket.handlers)) {
socket.handlers = []
}
socket.handlers.push([event, handler])
socket.on(event, handler)
}
// send current state
this.sendRoomData()
console.log(`[${socket.id}][@${socket.userData.username}] joined room ${this.roomId}`)
}
leave = (socket) => {
// if not connected to any room, do nothing
if (!socket.connectedRoomId) {
console.warn(`[${socket.id}][@${socket.userData.username}] not connected to any room`)
return
}
// if not connected to this room, do nothing
if (socket.connectedRoomId !== this.roomId) {
console.warn(`[${socket.id}][@${socket.userData.username}] not connected to room ${this.roomId}, cannot leave`)
return false
}
// leave room
socket.leave(this.roomId)
// remove from connections
const connIndex = this.connections.findIndex((socket_conn) => socket_conn.id === socket.id)
if (connIndex !== -1) {
this.connections.splice(connIndex, 1)
}
// remove connected room name
socket.connectedRoomId = null
// emit to self
socket.emit("room:left", this.composeRoomData())
// emit to others
this.io.to(this.roomId).emit("room:user:left", {
user: {
user_id: socket.userData._id,
username: socket.userData.username,
fullName: socket.userData.fullName,
avatar: socket.userData.avatar,
},
})
// unregister events
for (const [event, handler] of socket.handlers) {
socket.off(event, handler)
}
// send current state
this.sendRoomData()
console.log(`[${socket.id}][@${socket.userData.username}] left room ${this.roomId}`)
}
composeRoomData = () => {
return {
roomId: this.roomId,
limitations: this.limitations,
ownerUserId: this.ownerUserId,
options: this.roomOptions,
connectedUsers: this.connections.map((socket_conn) => {
return {
user_id: socket_conn.userData._id,
username: socket_conn.userData.username,
fullName: socket_conn.userData.fullName,
avatar: socket_conn.userData.avatar,
}
}),
currentState: this.currentState,
}
}
sendRoomData = () => {
this.io.to(this.roomId).emit("room:current-data", this.composeRoomData())
}
transferOwner = (socket) => {
if (!socket || !socket.userData) {
console.warn(`[${socket.id}] cannot transfer owner for room [${this.roomId}], no user data`)
return false
}
this.ownerUserId = socket.userData._id
console.log(`[${socket.id}][@${socket.userData.username}] is now the owner of the room [${this.roomId}]`)
this.io.to(this.roomId).emit("room:owner:changed", {
ownerUserId: this.ownerUserId,
})
}
destroy = () => {
for (const socket of this.connections) {
this.leave(socket)
}
this.connections = []
this.io.to(this.roomId).emit("room:destroyed", {
room: this.roomId,
})
console.log(`Room ${this.roomId} destroyed`)
}
makeOwner = (socket) => {
this.ownerUserId = socket.userData._id
}
}
class RoomsController {
constructor(io) {
if (!io) {
throw new Error("io is required")
}
this.io = io
}
rooms = []
checkRoomExists = (roomId) => {
return this.rooms.some((room) => room.roomId === roomId)
}
createRoom = async (roomId, roomOptions) => {
if (this.checkRoomExists(roomId)) {
throw new Error(`Room ${roomId} already exists`)
}
const room = new Room(this.io, roomId, roomOptions)
this.rooms.push(room)
return room
}
connectSocketToRoom = async (socket, roomId, roomOptions) => {
let room = null
if (!this.checkRoomExists(roomId)) {
room = await this.createRoom(roomId, roomOptions)
// make owner
room.makeOwner(socket)
}
// check if user is already connected to a room
if (socket.connectedRoomId) {
console.warn(`[${socket.id}][@${socket.userData.username}] already connected to room ${socket.connectedRoomId}`)
this.disconnectSocketFromRoom(socket)
}
if (!room) {
room = this.rooms.find((room) => room.roomId === roomId)
}
return room.join(socket)
}
disconnectSocketFromRoom = async (socket, roomId) => {
if (!roomId) {
roomId = socket.connectedRoomId
}
if (!this.checkRoomExists(roomId)) {
console.warn(`Cannot disconnect socket [${socket.id}][@${socket.userData.username}] from room ${roomId}, room does not exists`)
return false
}
const room = this.rooms.find((room) => room.roomId === roomId)
// if owners leaves, rotate owner to the next user
if (socket.userData._id === room.ownerUserId) {
if (room.connections.length > 0 && room.connections[1]) {
room.transferOwner(room.connections[1])
}
}
// leave
room.leave(socket)
// if room is empty, destroy it
if (room.connections.length === 0) {
await this.destroyRoom(roomId)
return true
}
return true
}
destroyRoom = async (roomId) => {
if (!this.checkRoomExists(roomId)) {
throw new Error(`Room ${roomId} does not exists`)
}
const room = this.rooms.find((room) => room.roomId === roomId)
room.destroy()
this.rooms.splice(this.rooms.indexOf(room), 1)
return true
}
}
export default class RoomsServer {
constructor(server) {
this.io = socketio(server, {
cors: {
origin: "*",
methods: ["GET", "POST"],
credentials: true,
}
})
this.RoomsController = new RoomsController(this.io)
}
connectionPool = []
events = {
"connection": (socket) => {
console.log(`[${socket.id}][${socket.userData.username}] connected to hub.`)
this.connectionPool.push(socket)
socket.on("disconnect", () => this.events.disconnect)
// Rooms
socket.on("join:room", (data) => this.RoomsController.connectSocketToRoom(socket, data.room, data.options))
socket.on("leave:room", (data) => this.RoomsController.disconnectSocketFromRoom(socket, data?.room ?? socket.connectedRoomId, data?.options ?? {}))
socket.on("invite:user", generateFnHandler(this.inviteUserToRoom, socket))
socket.on("ping", (callback) => {
callback()
})
socket.on("disconnect", (_socket) => {
console.log(`[${socket.id}][@${socket.userData.username}] disconnected to hub.`)
if (socket.connectedRoomId) {
console.log(`[${socket.id}][@${socket.userData.username}] was connected to room [${socket.connectedRoomId}], leaving...`)
this.RoomsController.disconnectSocketFromRoom(socket)
}
// remove from connection pool
this.connectionPool = this.connectionPool.filter((client) => client.id !== socket.id)
})
},
}
inviteUserToRoom = async (socket, data) => {
try {
// find sockets with matching user_id
const invitedSockets = this.connectionPool.filter((client) => client.userData._id === data.user_id)
if (invitedSockets.length === 0) {
console.warn(`[${socket.id}][@${socket.userData.username}] cannot invite user ${data.user_id}, user not found in connection pool`)
return socket.emit("error", {
message: `User ${data.user_id} not found`,
})
}
for (const invitedSocket of invitedSockets) {
// check if user is already connected to the room
if (invitedSocket.connectedRoomId === data.roomId) {
console.warn(`[${socket.id}][@${socket.userData.username}] cannot invite user ${data.user_id}, user already connected to room ${data.roomId}`)
return false
}
console.log(`[${socket.id}][@${socket.userData.username}] inviting user ${data.user_id} to room ${data.roomId}`)
invitedSocket.emit("invite:received", {
roomId: data.roomId,
invitedBy: {
_id: socket.userData._id,
username: socket.userData.username,
fullName: socket.userData.fullName,
avatar: socket.userData.avatar,
},
})
}
} catch (error) {
return socket.emit("error", {
message: error.message,
})
}
}
initialize = async () => {
this.io.use(withWsAuth)
Object.entries(this.events).forEach(([event, handler]) => {
this.io.on(event, (socket) => {
try {
handler(socket)
} catch (error) {
console.error(error)
}
})
})
}
}

View File

@ -0,0 +1,8 @@
import cors from "cors"
export default cors({
origin: "*",
methods: ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "CONNECT", "TRACE"],
preflightContinue: false,
optionsSuccessStatus: 204,
})

View File

@ -0,0 +1,14 @@
export default (req, res, next) => {
const startHrTime = process.hrtime()
res.on("finish", () => {
const elapsedHrTime = process.hrtime(startHrTime)
const elapsedTimeInMs = elapsedHrTime[0] * 1000 + elapsedHrTime[1] / 1e6
res._responseTimeMs = elapsedTimeInMs
console.log(`${req.method} ${res._status_code ?? res.statusCode ?? 200} ${req.url} ${elapsedTimeInMs}ms`)
})
next()
}