improve streaming controller endpoints

This commit is contained in:
SrGooglo 2023-04-10 16:14:41 +00:00
parent 91b256f0a4
commit cfb8a92cd3
15 changed files with 291 additions and 290 deletions

View File

@ -0,0 +1,42 @@
import { StreamingProfile } from "@models"
export default {
method: "DELETE",
route: "/streaming/profile",
middlewares: ["withAuthentication"],
fn: async (req, res) => {
const user_id = req.user._id.toString()
const { profile_id } = req.body
if (!profile_id) {
return res.status(400).json({
error: "Invalid request, missing profile_id"
})
}
// search for existing profile
let currentProfile = await StreamingProfile.findOne({
_id: profile_id,
})
if (!currentProfile) {
return res.status(400).json({
error: "Invalid request, profile not found"
})
}
// check if the profile belongs to the user
if (currentProfile.user_id !== user_id) {
return res.status(400).json({
error: "Invalid request, profile does not belong to the user"
})
}
// delete the profile
await currentProfile.delete()
return res.json({
success: true
})
}
}

View File

@ -1,25 +0,0 @@
export default {
method: "GET",
route: "/streaming/addresses",
middlewares: ["withOptionalAuthentication"],
fn: async (req, res) => {
const addresses = {
api: process.env.STREAMING_API_SERVER,
ingest: process.env.STREAMING_INGEST_SERVER,
}
if (req.user) {
addresses.ingestURL = `${addresses.ingest}/${req.user.username}`
addresses.liveURL = `${addresses.api}/live/${req.user.username}`
addresses.radioURL = `${addresses.api}/radio/${req.user.username}`
addresses.hlsURL = `${addresses.liveURL}/src.m3u8`
addresses.flvURL = `${addresses.liveURL}/src.flv`
addresses.aacURL = `${addresses.radioURL}/src.aac`
}
return res.json(addresses)
}
}

View File

@ -1,38 +0,0 @@
import { StreamingInfo, User } from "@models"
export default {
method: "GET",
route: "/stream/info",
middleware: ["withAuthentication"],
fn: async (req, res) => {
let user_id = req.query.user_id
if (!req.query.username && !req.query.user_id) {
return res.status(400).json({
error: "Invalid request, missing username"
})
}
if (!user_id) {
const user = await User.findOne({
username: req.query.username,
})
user_id = user._id.toString()
}
let info = await StreamingInfo.findOne({
user_id,
})
if (!info) {
info = new StreamingInfo({
user_id,
})
await info.save()
}
return res.json(info.toObject())
}
}

View File

@ -1,31 +0,0 @@
import { StreamingKey } from "@models"
import generateStreamingKey from "../services/generateStreamingKey"
export default {
method: "GET",
route: "/streaming/key",
middlewares: ["withAuthentication"],
fn: async (req, res) => {
let streamingKey = await StreamingKey.findOne({
user_id: req.user._id.toString()
})
if (!streamingKey) {
const newKey = await generateStreamingKey(req.user._id.toString()).catch(err => {
res.status(500).json({
error: `Cannot generate a new key: ${err.message}`,
})
return false
})
if (!newKey) {
return false
}
return res.json(newKey)
} else {
return res.json(streamingKey)
}
}
}

View File

@ -0,0 +1,53 @@
import { StreamingProfile } from "@models"
import NewStreamingProfile from "@services/newStreamingProfile"
export default {
method: "GET",
route: "/streaming/profiles",
middlewares: ["withAuthentication"],
fn: async (req, res) => {
const user_id = req.user._id.toString()
if (!user_id) {
return res.status(400).json({
error: "Invalid request, missing user_id"
})
}
let profiles = await StreamingProfile.find({
user_id,
}).select("+stream_key")
if (profiles.length === 0) {
// create a new profile
const profile = await NewStreamingProfile({
user_id,
profile_name: "default",
})
profiles = [profile]
}
profiles = profiles.map((profile) => {
profile = profile.toObject()
profile._id = profile._id.toString()
return profile
})
profiles = profiles.map((profile) => {
profile.addresses = {
ingest: `${process.env.STREAMING_INGEST_SERVER}/${req.user.username}:${profile._id}`,
hls: `${process.env.STREAMING_API_SERVER}/live/${req.user.username}:${profile._id}/src.m3u8`,
flv: `${process.env.STREAMING_API_SERVER}/live/${req.user.username}:${profile._id}/src.flv`,
dash: `${process.env.STREAMING_API_SERVER}/live/${req.user.username}:${profile._id}/src.mpd`,
aac: `${process.env.STREAMING_API_SERVER}/radio/${req.user.username}:${profile._id}/src.aac`,
}
return profile
})
return res.json(profiles)
}
}

View File

@ -4,10 +4,8 @@ export default {
method: "GET",
route: "/streams",
fn: async (req, res) => {
const remoteStreams = await fetchStreamsFromAPI()
if (req.query.username) {
const stream = remoteStreams.find((stream) => stream.username === req.query.username)
const stream = await fetchStreamsFromAPI(`live/${req.query.username}${req.query.profile_id ? `:${req.query.profile_id}` : ""}`)
if (!stream) {
return res.status(404).json({
@ -16,8 +14,10 @@ export default {
}
return res.json(stream)
}
} else {
const streams = await fetchStreamsFromAPI()
return res.json(remoteStreams)
return res.json(streams)
}
}
}

View File

@ -1,27 +1,49 @@
import generateStreamDataFromStreamingKey from "../services/generateStreamDataFromStreamingKey"
// This endpoint is used by the streaming server to check if a stream is valid and to notify the clients that a stream has started
import { StreamingProfile, User } from "@models"
export default {
method: "POST",
route: "/stream/publish",
fn: async (req, res) => {
const { stream } = req.body
const { stream, app } = req.body
const streaming = await generateStreamDataFromStreamingKey(stream).catch((err) => {
console.error(err)
res.status(500).json({
error: `Cannot generate stream: ${err.message}`,
const streamingProfile = await StreamingProfile.findOne({
stream_key: stream
})
return null
if (!streamingProfile) {
return res.status(404).json({
error: "Streaming profile not found",
})
}
if (streaming) {
global.websocket_instance.io.emit(`streaming.new`, streaming)
const user = await User.findById(streamingProfile.user_id)
global.websocket_instance.io.emit(`streaming.new.${streaming.username}`, streaming)
if (!user) {
return res.status(404).json({
code: 1,
error: "User not found",
})
}
const [username, profile_id] = app.split("/")[1].split(":")
if (user.username !== username) {
return res.status(403).json({
code: 1,
error: "Invalid mount point, username does not match with the stream key",
})
}
if (streamingProfile._id.toString() !== profile_id) {
return res.status(403).json({
code: 1,
error: "Invalid mount point, profile id does not match with the stream key",
})
}
global.websocket_instance.io.emit(`streaming.new`, streamingProfile)
global.websocket_instance.io.emit(`streaming.new.${streamingProfile.user_id}`, streamingProfile)
return res.json({
code: 0,
@ -29,4 +51,3 @@ export default {
})
}
}
}

View File

@ -1,4 +1,4 @@
import generateStreamDataFromStreamingKey from "../services/generateStreamDataFromStreamingKey"
import { StreamingProfile } from "@models"
export default {
method: "POST",
@ -6,21 +6,24 @@ export default {
fn: async (req, res) => {
const { stream } = req.body
const streaming = await generateStreamDataFromStreamingKey(stream).catch((err) => {
console.error(err)
return null
const streamingProfile = await StreamingProfile.findOne({
stream_key: stream
})
if (streaming) {
global.websocket_instance.io.emit(`streaming.end`, streaming)
if (streamingProfile) {
global.websocket_instance.io.emit(`streaming.end`, streamingProfile)
global.websocket_instance.io.emit(`streaming.end.${streaming.username}`, streaming)
global.websocket_instance.io.emit(`streaming.end.${streamingProfile.user_id}`, streamingProfile)
return res.json({
code: 0,
status: "ok"
})
}
return res.json({
code: 0,
status: "ok, but no streaming profile found"
})
}
}

View File

@ -0,0 +1,60 @@
import { StreamingProfile } from "@models"
import NewStreamingProfile from "@services/newStreamingProfile"
export default {
method: "POST",
route: "/streaming/profile",
middlewares: ["withAuthentication"],
fn: async (req, res) => {
const user_id = req.user._id.toString()
if (!user_id) {
return res.status(400).json({
error: "Invalid request, missing user_id"
})
}
const {
profile_id,
profile_name,
info,
options,
} = req.body
if (!profile_id && !profile_name) {
return res.status(400).json({
error: "Invalid request, missing profile_id and profile_name"
})
}
// search for existing profile
let currentProfile = await StreamingProfile.findOne({
_id: profile_id,
})
if (currentProfile && profile_id) {
// update the profile
currentProfile.profile_name = profile_name
currentProfile.info = info
currentProfile.options = options
await currentProfile.save()
} else {
if (!profile_name) {
return res.status(400).json({
error: "Invalid request, missing profile_name"
})
}
// create a new profile
currentProfile = await NewStreamingProfile({
user_id,
profile_name,
info,
options,
})
}
return res.json(currentProfile)
}
}

View File

@ -1,35 +1,37 @@
import { StreamingKey } from "@models"
import generateStreamingKey from "../services/generateStreamingKey"
import { StreamingProfile } from "@models"
export default {
method: "POST",
route: "/streaming/key/regenerate",
route: "/streaming/regenerate_key",
middlewares: ["withAuthentication"],
fn: async (req, res) => {
// check if the user already has a key
let streamingKey = await StreamingKey.findOne({
user_id: req.user._id.toString()
const { profile_id } = req.body
if (!profile_id) {
return res.status(400).json({
message: "Missing profile_id"
})
// if exists, delete it
if (streamingKey) {
await streamingKey.remove()
}
// generate a new key
const newKey = await generateStreamingKey(req.user._id.toString()).catch(err => {
res.status(500).json({
error: `Cannot generate a new key: ${err.message}`,
const profile = await StreamingProfile.findById(profile_id)
if (!profile) {
return res.status(404).json({
message: "Profile not found"
})
}
return false
// check if profile user is the same as the user in the request
if (profile.user_id !== req.user._id.toString()) {
return res.status(403).json({
message: "You are not allowed to regenerate this key"
})
if (!newKey) {
return false
}
return res.json(newKey)
profile.stream_key = global.nanoid()
await profile.save()
return res.json(profile.toObject())
}
}

View File

@ -1,30 +0,0 @@
import handleStreamInfoUpdate from "../services/handleStreamInfoUpdate"
export default {
method: "POST",
route: "/stream/info",
middlewares: ["withAuthentication"],
fn: async (req, res) => {
const { title, description, category, thumbnail } = req.body
const info = await handleStreamInfoUpdate({
user_id: req.user._id.toString(),
title,
description,
category,
thumbnail
}).catch((err) => {
console.error(err)
res.status(500).json({
error: `Cannot update info: ${err.message}`,
})
return null
})
if (info) {
return res.json(info)
}
}
}

View File

@ -1,18 +1,20 @@
import axios from "axios"
import lodash from "lodash"
import { StreamingCategory, StreamingInfo } from "@models"
import generateStreamDataFromStreamingKey from "./generateStreamDataFromStreamingKey"
import { StreamingCategory, StreamingProfile, User } from "@models"
const streamingServerAPIAddress = process.env.STREAMING_API_SERVER ?? ""
const streamingServerAPIUri = `${streamingServerAPIAddress.startsWith("https") ? "https" : "http"}://${streamingServerAPIAddress.split("://")[1]}`
const FILTER_KEYS = ["stream"]
export default async (stream_id) => {
let apiURI = `${streamingServerAPIUri}/api/v1/streams`
if (stream_id) {
apiURI = `${streamingServerAPIUri}/api/v1/streams/${stream_id}`
}
export default async () => {
// fetch all streams from api
let { data } = await axios.get(`${streamingServerAPIUri}/api/v1/streams`).catch((err) => {
let { data } = await axios.get(apiURI).catch((err) => {
console.error(err)
return false
})
@ -21,35 +23,61 @@ export default async () => {
if (!data) return streamings
streamings = data.streams
streamings = streamings.map(async (stream) => {
const { video, audio, clients } = stream
stream = await generateStreamDataFromStreamingKey(stream.name)
let info = await StreamingInfo.findOne({
user_id: stream.user_id
})
if (info) {
stream.info = info.toObject()
stream.info.category = await StreamingCategory.findOne({
key: stream.info.category
})
if (data.stream && stream_id) {
streamings.push(data.stream)
}
stream.video = video
stream.audio = audio
stream.connectedClients = clients ?? 0
if (data.streams) {
streamings = data.streams
}
return stream
streamings = streamings.map(async (stream) => {
const { video, audio, clients, app } = stream
const profile_id = app.split(":")[1]
let profile = await StreamingProfile.findById(profile_id)
if (!profile) return null
profile = profile.toObject()
profile._id = profile._id.toString()
profile.info.category = await StreamingCategory.findOne({
key: profile.info.category
})
let user = await User.findById(profile.user_id)
if (!user) return null
user = user.toObject()
return {
profile_id: profile._id,
info: profile.info,
stream: `${user.username}?profile=${profile._id}`,
user,
video,
audio,
connectedClients: clients ?? 0,
sources: {
hls: `${streamingServerAPIUri}/live/${user.username}:${profile._id}/src.m3u8`,
flv: `${streamingServerAPIUri}/live/${user.username}:${profile._id}/src.flv`,
dash: `${streamingServerAPIUri}/live/${user.username}:${profile._id}/src.mpd`,
aac: `${streamingServerAPIUri}/radio/${user.username}:${profile._id}/src.aac`,
}
}
})
streamings = await Promise.all(streamings)
return streamings.map((stream) => {
return lodash.omit(stream, FILTER_KEYS)
})
streamings = streamings.filter((stream) => stream !== null)
if (stream_id) {
return streamings[0]
}
return streamings
}

View File

@ -1,25 +0,0 @@
import { StreamingKey } from "@models"
const streamingServerAPIUri = process.env.STREAMING_API_SERVER ? `${process.env.STREAMING_API_SERVER.startsWith("https") ? "https" : "http"}://${process.env.STREAMING_API_SERVER.split("://")[1]}` : "Not available"
export default async (key) => {
// generate a stream from a streamkey
const streamingKey = await StreamingKey.findOne({
key: key
})
if (!streamingKey) return false
const streaming = {
user_id: streamingKey.user_id,
username: streamingKey.username,
sources: {
rtmp: `${process.env.STREAMING_INGEST_SERVER}/live/${streamingKey.username}`,
hls: `${streamingServerAPIUri}/live/${streamingKey.username}/src.m3u8`,
flv: `${streamingServerAPIUri}/live/${streamingKey.username}/src.flv`,
aac: `${streamingServerAPIUri}/radio/${streamingKey.username}/src.aac`,
}
}
return streaming
}

View File

@ -1,19 +0,0 @@
import { StreamingKey, User } from "@models"
export default async (user_id) => {
// this will generate a new key for the user
// if the user already has a key, it will be regenerated
// get username from user_id
const userData = await User.findById(user_id)
const streamingKey = new StreamingKey({
user_id,
username: userData.username,
key: global.nanoid()
})
await streamingKey.save()
return streamingKey
}

View File

@ -1,40 +0,0 @@
import { StreamingInfo } from "@models"
import lodash from "lodash"
export default async (payload) => {
let info = await StreamingInfo.findOne({
user_id: payload.user_id
}).catch((err) => {
return false
})
const payloadValues = {
title: payload.title,
description: payload.description,
category: payload.category,
thumbnail: payload.thumbnail,
}
if (!info) {
// create new info
info = new StreamingInfo({
user_id: payload.user_id,
...payloadValues
})
}
// merge data
info = lodash.merge(info, {
title: payload.title,
description: payload.description,
category: payload.category,
thumbnail: payload.thumbnail,
})
await info.save()
global.websocket_instance.io.emit(`streaming.info_update.${payload.user_id}`, info)
return info
}