improve stream fetch

This commit is contained in:
SrGooglo 2022-11-28 17:27:37 +00:00 committed by root
parent 2f275fc762
commit 46680f21c8

View File

@ -6,17 +6,14 @@ import axios from "axios"
import { Schematized } from "../../lib" import { Schematized } from "../../lib"
import { User, StreamingKey, StreamingInfo, StreamingCategory } from "../../models" import { User, StreamingKey, StreamingInfo, StreamingCategory } from "../../models"
const streamingIngestServer = process.env.STREAMING_INGEST_SERVER const streamingIngestServer = process.env.STREAMING_INGEST_SERVER ?? ""
const streamingServerAPIAddress = process.env.STREAMING_API_SERVER const streamingServerAPIAddress = process.env.STREAMING_API_SERVER ?? ""
const streamingServerAPIProtocol = streamingServerAPIAddress.startsWith("https") ? "https" : "http"
const streamingServerAPIUri = `${streamingServerAPIProtocol}://${streamingServerAPIAddress.split("://")[1]}` const streamingServerAPIUri = `${streamingServerAPIAddress.startsWith("https") ? "https" : "http"}://${streamingServerAPIAddress.split("://")[1]}`
const FILTER_KEYS = ["stream"] const FILTER_KEYS = ["stream"]
export default class StreamingController extends Controller { export default class StreamingController extends Controller {
streamings = []
methods = { methods = {
genereteKey: async (user_id) => { genereteKey: async (user_id) => {
// this will generate a new key for the user // this will generate a new key for the user
@ -35,78 +32,61 @@ export default class StreamingController extends Controller {
return streamingKey return streamingKey
}, },
regenerateStreamingList: async () => { fetchStreams: async () => {
// fetch all streams from api // fetch all streams from api
let streams = await axios.get(`${streamingServerAPIUri}/api/v1/streams`).catch((err) => { let { data } = await axios.get(`${streamingServerAPIUri}/api/v1/streams`).catch((err) => {
console.log(err) console.error(err)
return false return false
}) })
if (streams) { let streamings = []
streams = streams.data.streams
// FIXME: this method is not totally async if (!data) return streamings
streams.forEach((stream) => {
// check if the stream is already in the list
const streamInList = this.streamings.find((s) => s.stream === stream.name)
if (!streamInList) { streamings = data.streams
// if not, add it
this.methods.pushToLocalList({ streamings = streamings.map(async (stream) => {
stream: stream.name, stream = await this.methods.generateStreamFromStreamkey(stream.name)
app: stream.app,
}).catch((err) => { let info = await StreamingInfo.findOne({
// sorry for you user_id: stream.user_id
})
}
}) })
}
},
pushToLocalList: async (payload) => {
const { stream, app } = payload
const username = app.split("/")[1] if (info) {
const user_id = await User.findOne({ username }).then((user) => user._id) stream.info = info.toObject()
const streamingKey = await StreamingKey.findOne({ stream.info.category = await StreamingCategory.findOne({
key: stream key: stream.info.category
})
}
return stream
}) })
if (!streamingKey) { streamings = await Promise.all(streamings)
throw new Error("Invalid streaming key")
}
if (username !== streamingKey.username) { return streamings.map((stream) => {
throw new Error("Invalid streaming key for this username") return lodash.omit(stream, FILTER_KEYS)
} })
},
generateStreamFromStreamkey: async (streamKey) => {
// generate a stream from a streamkey
const streamingKey = await StreamingKey.findOne({
key: streamKey
})
if (!streamingKey) return false
const streaming = { const streaming = {
stream, user_id: streamingKey.user_id,
user_id: user_id.toString(),
username: streamingKey.username, username: streamingKey.username,
sources: { sources: {
rtmp: `${streamingIngestServer}/live/${username}`, rtmp: `${streamingIngestServer}/live/${streamingKey.username}`,
hls: `${streamingServerAPIAddress}/live/${username}/src.m3u8`, hls: `${streamingServerAPIAddress}/live/${streamingKey.username}/src.m3u8`,
flv: `${streamingServerAPIAddress}/live/${username}/src.flv`, flv: `${streamingServerAPIAddress}/live/${streamingKey.username}/src.flv`,
} }
} }
this.streamings.push(streaming)
return streaming
},
removeFromLocalList: async (payload) => {
const { stream } = payload
// remove from streamings array
const streaming = this.streamings.find((streaming) => streaming.stream === stream)
if (!streaming) {
throw new Error("Stream not found")
}
this.streamings = this.streamings.filter((streaming) => streaming.stream !== stream)
return streaming return streaming
}, },
handleInfoUpdate: async (payload) => { handleInfoUpdate: async (payload) => {
@ -154,31 +134,9 @@ export default class StreamingController extends Controller {
return res.json(categories) return res.json(categories)
}, },
"/streams": async (req, res) => { "/streams": async (req, res) => {
await this.methods.regenerateStreamingList() const remoteStreams = await this.methods.fetchStreams()
let data = this.streamings.map((stream) => { return res.json(remoteStreams)
return lodash.omit(stream, FILTER_KEYS)
})
data = data.map(async (stream) => {
let info = await StreamingInfo.findOne({
user_id: stream.user_id
})
if (info) {
stream.info = info.toObject()
stream.info.category = await StreamingCategory.findOne({
key: stream.info.category
})
}
return stream
})
data = await Promise.all(data)
return res.json(data)
}, },
"/stream/info": { "/stream/info": {
middleware: ["withAuthentication"], middleware: ["withAuthentication"],
@ -238,8 +196,10 @@ export default class StreamingController extends Controller {
"/streaming/:username": async (req, res) => { "/streaming/:username": async (req, res) => {
const { username } = req.params const { username } = req.params
const streamings = await this.methods.fetchStreams()
// search on this.streamings // search on this.streamings
const streaming = this.streamings.find((streaming) => streaming.username === username) const streaming = streamings.find((streaming) => streaming.username === username)
if (streaming) { if (streaming) {
return res.json(lodash.omit(streaming, FILTER_KEYS)) return res.json(lodash.omit(streaming, FILTER_KEYS))
@ -335,25 +295,22 @@ export default class StreamingController extends Controller {
} }
}, },
"/streaming/publish": async (req, res) => { "/streaming/publish": async (req, res) => {
const { app, stream, tcUrl } = req.body const { stream } = req.body
const streaming = await this.methods.generateStreamFromStreamkey(stream).catch((err) => {
console.error(err)
const streaming = await this.methods.pushToLocalList({
app,
stream,
tcUrl
}).catch((err) => {
res.status(500).json({ res.status(500).json({
code: 1, error: `Cannot generate stream: ${err.message}`,
error: err.message
}) })
return false return null
}) })
if (streaming) { if (streaming) {
global.wsInterface.io.emit(`streaming.new`, { global.wsInterface.io.emit(`streaming.new`, streaming)
username: streaming.username,
}) global.wsInterface.io.emit(`streaming.new.${streaming.username}`, streaming)
return res.json({ return res.json({
code: 0, code: 0,
@ -364,21 +321,16 @@ export default class StreamingController extends Controller {
"/streaming/unpublish": async (req, res) => { "/streaming/unpublish": async (req, res) => {
const { stream } = req.body const { stream } = req.body
const streaming = await this.methods.removeFromLocalList({ const streaming = await this.methods.generateStreamFromStreamkey(stream).catch((err) => {
stream console.error(err)
}).catch((err) => {
res.status(500).json({
code: 2,
status: err.message
})
return false return null
}) })
if (streaming) { if (streaming) {
global.wsInterface.io.emit(`streaming.end`, { global.wsInterface.io.emit(`streaming.end`, streaming)
username: streaming.username,
}) global.wsInterface.io.emit(`streaming.end.${streaming.username}`, streaming)
return res.json({ return res.json({
code: 0, code: 0,