From 1c27ac1a06004008827d860dc3a32767681cd4db Mon Sep 17 00:00:00 2001 From: SrGooglo Date: Tue, 18 Feb 2025 04:23:23 +0000 Subject: [PATCH] added radio subsystem --- .../server/db_models/radioProfile/index.js | 22 +++++ .../server/services/music/music.service.js | 35 +++---- .../routes/music/radio/[radio_id]/get.js | 9 ++ .../music/routes/music/radio/list/get.js | 47 +++++++++ .../music/radio/sse/[channel_id]/get.js | 16 ++++ .../music/routes/music/radio/webhook/post.js | 95 +++++++++++++++++++ 6 files changed, 208 insertions(+), 16 deletions(-) create mode 100644 packages/server/db_models/radioProfile/index.js create mode 100644 packages/server/services/music/routes/music/radio/[radio_id]/get.js create mode 100644 packages/server/services/music/routes/music/radio/list/get.js create mode 100644 packages/server/services/music/routes/music/radio/sse/[channel_id]/get.js create mode 100644 packages/server/services/music/routes/music/radio/webhook/post.js diff --git a/packages/server/db_models/radioProfile/index.js b/packages/server/db_models/radioProfile/index.js new file mode 100644 index 00000000..95625ad7 --- /dev/null +++ b/packages/server/db_models/radioProfile/index.js @@ -0,0 +1,22 @@ +export default { + name: "RadioProfile", + collection: "radio_profiles", + schema: { + user_id: { + type: String, + required: true, + }, + created_at: { + type: Date, + required: true, + }, + token: { + type: String, + required: true, + select: false, + }, + background: { + type: String, + }, + }, +} diff --git a/packages/server/services/music/music.service.js b/packages/server/services/music/music.service.js index 251a3a30..4e24f38a 100755 --- a/packages/server/services/music/music.service.js +++ b/packages/server/services/music/music.service.js @@ -1,30 +1,33 @@ import { Server } from "linebridge" import DbManager from "@shared-classes/DbManager" +import SSEManager from "@shared-classes/SSEManager" import SharedMiddlewares from "@shared-middlewares" import LimitsClass from "@shared-classes/Limits" export default class API extends Server { - static refName = "music" - static enableWebsockets = true - static routesPath = `${__dirname}/routes` - static listen_port = process.env.HTTP_LISTEN_PORT ?? 3003 + static refName = "music" + static enableWebsockets = true + static routesPath = `${__dirname}/routes` + static listen_port = process.env.HTTP_LISTEN_PORT ?? 3003 - middlewares = { - ...SharedMiddlewares - } + middlewares = { + ...SharedMiddlewares, + } - contexts = { - db: new DbManager(), - limits: {}, - } + contexts = { + db: new DbManager(), + SSEManager: new SSEManager(), + } - async onInitialize() { - await this.contexts.db.initialize() + async onInitialize() { + global.sse = this.contexts.SSEManager - this.contexts.limits = await LimitsClass.get() - } + await this.contexts.db.initialize() + + this.contexts.limits = await LimitsClass.get() + } } -Boot(API) \ No newline at end of file +Boot(API) diff --git a/packages/server/services/music/routes/music/radio/[radio_id]/get.js b/packages/server/services/music/routes/music/radio/[radio_id]/get.js new file mode 100644 index 00000000..90969443 --- /dev/null +++ b/packages/server/services/music/routes/music/radio/[radio_id]/get.js @@ -0,0 +1,9 @@ +export default async (req, res) => { + const radioId = req.params.radio_id + + let redisData = await global.websocket.redis + .hgetall(`radio-${radioId}`) + .catch(() => null) + + return redisData +} diff --git a/packages/server/services/music/routes/music/radio/list/get.js b/packages/server/services/music/routes/music/radio/list/get.js new file mode 100644 index 00000000..c10a069c --- /dev/null +++ b/packages/server/services/music/routes/music/radio/list/get.js @@ -0,0 +1,47 @@ +import { RadioProfile } from "@db_models" + +async function scanKeysWithPagination(pattern, count = 10, cursor = "0") { + const result = await global.websocket.redis.scan( + cursor, + "MATCH", + pattern, + "COUNT", + count, + ) + + return result[1] +} + +async function getHashData(hashKey) { + const hashData = await global.websocket.redis.hgetall(hashKey) + return hashData +} + +export default async (req) => { + const { limit = 50, offset = 0 } = req.query + + let result = await scanKeysWithPagination(`radio-*`, limit, String(offset)) + + const radioIds = result.map((key) => key.split("radio-")[1]) + + const radioProfiles = await RadioProfile.find({ + _id: { $in: radioIds }, + }) + + result = await Promise.all( + result.map(async (key) => { + let data = await getHashData(key) + + const profile = radioProfiles + .find((profile) => profile._id.toString() === data.radio_id) + .toObject() + + data.now_playing = JSON.parse(data.now_playing) + data.online = ToBoolean(data.online) + + return { ...data, ...profile } + }), + ) + + return result +} diff --git a/packages/server/services/music/routes/music/radio/sse/[channel_id]/get.js b/packages/server/services/music/routes/music/radio/sse/[channel_id]/get.js new file mode 100644 index 00000000..6fc963bb --- /dev/null +++ b/packages/server/services/music/routes/music/radio/sse/[channel_id]/get.js @@ -0,0 +1,16 @@ +export default async (req, res) => { + const { channel_id } = req.params + + const radioId = channel_id.split("radio:")[1] + + let redisData = await global.websocket.redis + .hgetall(`radio-${radioId}`) + .catch(() => null) + + global.sse.connectToChannelStream(channel_id, req, res, { + initialData: { + event: "update", + data: redisData, + }, + }) +} diff --git a/packages/server/services/music/routes/music/radio/webhook/post.js b/packages/server/services/music/routes/music/radio/webhook/post.js new file mode 100644 index 00000000..d1213c3c --- /dev/null +++ b/packages/server/services/music/routes/music/radio/webhook/post.js @@ -0,0 +1,95 @@ +import { RadioProfile } from "@db_models" + +function parseBasicAuth(auth) { + if (!auth || typeof auth !== "string") { + throw new Error("No or wrong argument") + } + + var result = {}, + parts, + decoded, + colon + + parts = auth.split(" ") + + result.scheme = parts[0] + if (result.scheme !== "Basic") { + return result + } + + decoded = new Buffer(parts[1], "base64").toString("utf8") + colon = decoded.indexOf(":") + + result.username = decoded.substr(0, colon) + result.password = decoded.substr(colon + 1) + + return result +} + +export default async (req) => { + if (!req.headers["authorization"]) { + throw new OperationError(401, "Missing authorization header") + } + + if (!req.headers["authorization"].startsWith("Basic")) { + throw new OperationError(401, "Invalid authorization type. Use Basic.") + } + + const auth = parseBasicAuth(req.headers["authorization"]) + + const profile = await RadioProfile.find({ + _id: auth.username, + }).select("+token") + + if (!profile) { + throw new OperationError(404, "Profile with this token not exist") + } + + if (profile.token !== auth.token) { + throw new OperationError(401, "Token missmatch") + } + + let data = { + radio_id: auth.username, + listeners: req.body.listeners.total, + station_id: req.body.station.id, + name: req.body.station.name, + hls_src: req.body.station.hls_url, + http_src: req.body.station.listen_url, + now_playing: req.body.now_playing, + online: ToBoolean(req.body.is_online), + background: profile.background, + } + + const redis_id = `radio-${data.radio_id}` + + const existMember = await global.websocket.redis.hexists( + redis_id, + "radio_id", + ) + + if (data.online) { + await global.websocket.redis.hset(redis_id, { + ...data, + now_playing: JSON.stringify(data.now_playing), + }) + } + + if (!data.online && existMember) { + await global.websocket.redis.hdel(redis_id) + } + + console.log(`Updating Radio`, data, { + redis_id, + online: data.online, + existMember, + }) + + global.sse.sendToChannel(`radio:${data.radio_id}`, { + event: "update", + data: data, + }) + global.websocket.io.to(`radio:${data.radio_id}`).emit(`update`, data) + + return data +}