added radio subsystem

This commit is contained in:
SrGooglo 2025-02-18 04:23:23 +00:00
parent c1feeb0221
commit 1c27ac1a06
6 changed files with 208 additions and 16 deletions

View File

@ -0,0 +1,22 @@
export default {
name: "RadioProfile",
collection: "radio_profiles",
schema: {
user_id: {
type: String,
required: true,
},
created_at: {
type: Date,
required: true,
},
token: {
type: String,
required: true,
select: false,
},
background: {
type: String,
},
},
}

View File

@ -1,30 +1,33 @@
import { Server } from "linebridge"
import DbManager from "@shared-classes/DbManager"
import SSEManager from "@shared-classes/SSEManager"
import SharedMiddlewares from "@shared-middlewares"
import LimitsClass from "@shared-classes/Limits"
export default class API extends Server {
static refName = "music"
static enableWebsockets = true
static routesPath = `${__dirname}/routes`
static listen_port = process.env.HTTP_LISTEN_PORT ?? 3003
static refName = "music"
static enableWebsockets = true
static routesPath = `${__dirname}/routes`
static listen_port = process.env.HTTP_LISTEN_PORT ?? 3003
middlewares = {
...SharedMiddlewares
}
middlewares = {
...SharedMiddlewares,
}
contexts = {
db: new DbManager(),
limits: {},
}
contexts = {
db: new DbManager(),
SSEManager: new SSEManager(),
}
async onInitialize() {
await this.contexts.db.initialize()
async onInitialize() {
global.sse = this.contexts.SSEManager
this.contexts.limits = await LimitsClass.get()
}
await this.contexts.db.initialize()
this.contexts.limits = await LimitsClass.get()
}
}
Boot(API)
Boot(API)

View File

@ -0,0 +1,9 @@
export default async (req, res) => {
const radioId = req.params.radio_id
let redisData = await global.websocket.redis
.hgetall(`radio-${radioId}`)
.catch(() => null)
return redisData
}

View File

@ -0,0 +1,47 @@
import { RadioProfile } from "@db_models"
async function scanKeysWithPagination(pattern, count = 10, cursor = "0") {
const result = await global.websocket.redis.scan(
cursor,
"MATCH",
pattern,
"COUNT",
count,
)
return result[1]
}
async function getHashData(hashKey) {
const hashData = await global.websocket.redis.hgetall(hashKey)
return hashData
}
export default async (req) => {
const { limit = 50, offset = 0 } = req.query
let result = await scanKeysWithPagination(`radio-*`, limit, String(offset))
const radioIds = result.map((key) => key.split("radio-")[1])
const radioProfiles = await RadioProfile.find({
_id: { $in: radioIds },
})
result = await Promise.all(
result.map(async (key) => {
let data = await getHashData(key)
const profile = radioProfiles
.find((profile) => profile._id.toString() === data.radio_id)
.toObject()
data.now_playing = JSON.parse(data.now_playing)
data.online = ToBoolean(data.online)
return { ...data, ...profile }
}),
)
return result
}

View File

@ -0,0 +1,16 @@
export default async (req, res) => {
const { channel_id } = req.params
const radioId = channel_id.split("radio:")[1]
let redisData = await global.websocket.redis
.hgetall(`radio-${radioId}`)
.catch(() => null)
global.sse.connectToChannelStream(channel_id, req, res, {
initialData: {
event: "update",
data: redisData,
},
})
}

View File

@ -0,0 +1,95 @@
import { RadioProfile } from "@db_models"
function parseBasicAuth(auth) {
if (!auth || typeof auth !== "string") {
throw new Error("No or wrong argument")
}
var result = {},
parts,
decoded,
colon
parts = auth.split(" ")
result.scheme = parts[0]
if (result.scheme !== "Basic") {
return result
}
decoded = new Buffer(parts[1], "base64").toString("utf8")
colon = decoded.indexOf(":")
result.username = decoded.substr(0, colon)
result.password = decoded.substr(colon + 1)
return result
}
export default async (req) => {
if (!req.headers["authorization"]) {
throw new OperationError(401, "Missing authorization header")
}
if (!req.headers["authorization"].startsWith("Basic")) {
throw new OperationError(401, "Invalid authorization type. Use Basic.")
}
const auth = parseBasicAuth(req.headers["authorization"])
const profile = await RadioProfile.find({
_id: auth.username,
}).select("+token")
if (!profile) {
throw new OperationError(404, "Profile with this token not exist")
}
if (profile.token !== auth.token) {
throw new OperationError(401, "Token missmatch")
}
let data = {
radio_id: auth.username,
listeners: req.body.listeners.total,
station_id: req.body.station.id,
name: req.body.station.name,
hls_src: req.body.station.hls_url,
http_src: req.body.station.listen_url,
now_playing: req.body.now_playing,
online: ToBoolean(req.body.is_online),
background: profile.background,
}
const redis_id = `radio-${data.radio_id}`
const existMember = await global.websocket.redis.hexists(
redis_id,
"radio_id",
)
if (data.online) {
await global.websocket.redis.hset(redis_id, {
...data,
now_playing: JSON.stringify(data.now_playing),
})
}
if (!data.online && existMember) {
await global.websocket.redis.hdel(redis_id)
}
console.log(`Updating Radio`, data, {
redis_id,
online: data.online,
existMember,
})
global.sse.sendToChannel(`radio:${data.radio_id}`, {
event: "update",
data: data,
})
global.websocket.io.to(`radio:${data.radio_id}`).emit(`update`, data)
return data
}