diff --git a/packages/server/src/controllers/StreamingController/index.js b/packages/server/src/controllers/StreamingController/index.js index cd7109dc..2616610a 100755 --- a/packages/server/src/controllers/StreamingController/index.js +++ b/packages/server/src/controllers/StreamingController/index.js @@ -6,17 +6,14 @@ import axios from "axios" import { Schematized } from "../../lib" import { User, StreamingKey, StreamingInfo, StreamingCategory } from "../../models" -const streamingIngestServer = process.env.STREAMING_INGEST_SERVER -const streamingServerAPIAddress = process.env.STREAMING_API_SERVER -const streamingServerAPIProtocol = streamingServerAPIAddress.startsWith("https") ? "https" : "http" +const streamingIngestServer = process.env.STREAMING_INGEST_SERVER ?? "" +const streamingServerAPIAddress = process.env.STREAMING_API_SERVER ?? "" -const streamingServerAPIUri = `${streamingServerAPIProtocol}://${streamingServerAPIAddress.split("://")[1]}` +const streamingServerAPIUri = `${streamingServerAPIAddress.startsWith("https") ? "https" : "http"}://${streamingServerAPIAddress.split("://")[1]}` const FILTER_KEYS = ["stream"] export default class StreamingController extends Controller { - streamings = [] - methods = { genereteKey: async (user_id) => { // this will generate a new key for the user @@ -35,78 +32,61 @@ export default class StreamingController extends Controller { return streamingKey }, - regenerateStreamingList: async () => { + fetchStreams: async () => { // fetch all streams from api - let streams = await axios.get(`${streamingServerAPIUri}/api/v1/streams`).catch((err) => { - console.log(err) + let { data } = await axios.get(`${streamingServerAPIUri}/api/v1/streams`).catch((err) => { + console.error(err) return false }) - if (streams) { - streams = streams.data.streams + let streamings = [] - // FIXME: this method is not totally async - streams.forEach((stream) => { - // check if the stream is already in the list - const streamInList = this.streamings.find((s) => s.stream === stream.name) + if (!data) return streamings - if (!streamInList) { - // if not, add it - this.methods.pushToLocalList({ - stream: stream.name, - app: stream.app, - }).catch((err) => { - // sorry for you - }) - } + streamings = data.streams + + streamings = streamings.map(async (stream) => { + stream = await this.methods.generateStreamFromStreamkey(stream.name) + + let info = await StreamingInfo.findOne({ + user_id: stream.user_id }) - } - }, - pushToLocalList: async (payload) => { - const { stream, app } = payload - const username = app.split("/")[1] - const user_id = await User.findOne({ username }).then((user) => user._id) + if (info) { + stream.info = info.toObject() - const streamingKey = await StreamingKey.findOne({ - key: stream + stream.info.category = await StreamingCategory.findOne({ + key: stream.info.category + }) + } + + return stream }) - if (!streamingKey) { - throw new Error("Invalid streaming key") - } + streamings = await Promise.all(streamings) - if (username !== streamingKey.username) { - throw new Error("Invalid streaming key for this username") - } + return streamings.map((stream) => { + return lodash.omit(stream, FILTER_KEYS) + }) + }, + generateStreamFromStreamkey: async (streamKey) => { + // generate a stream from a streamkey + const streamingKey = await StreamingKey.findOne({ + key: streamKey + }) + + if (!streamingKey) return false const streaming = { - stream, - user_id: user_id.toString(), + user_id: streamingKey.user_id, username: streamingKey.username, sources: { - rtmp: `${streamingIngestServer}/live/${username}`, - hls: `${streamingServerAPIAddress}/live/${username}/src.m3u8`, - flv: `${streamingServerAPIAddress}/live/${username}/src.flv`, + rtmp: `${streamingIngestServer}/live/${streamingKey.username}`, + hls: `${streamingServerAPIAddress}/live/${streamingKey.username}/src.m3u8`, + flv: `${streamingServerAPIAddress}/live/${streamingKey.username}/src.flv`, } } - this.streamings.push(streaming) - - return streaming - }, - removeFromLocalList: async (payload) => { - const { stream } = payload - - // remove from streamings array - const streaming = this.streamings.find((streaming) => streaming.stream === stream) - - if (!streaming) { - throw new Error("Stream not found") - } - - this.streamings = this.streamings.filter((streaming) => streaming.stream !== stream) - return streaming }, handleInfoUpdate: async (payload) => { @@ -154,31 +134,9 @@ export default class StreamingController extends Controller { return res.json(categories) }, "/streams": async (req, res) => { - await this.methods.regenerateStreamingList() + const remoteStreams = await this.methods.fetchStreams() - let data = this.streamings.map((stream) => { - return lodash.omit(stream, FILTER_KEYS) - }) - - data = data.map(async (stream) => { - let info = await StreamingInfo.findOne({ - user_id: stream.user_id - }) - - if (info) { - stream.info = info.toObject() - - stream.info.category = await StreamingCategory.findOne({ - key: stream.info.category - }) - } - - return stream - }) - - data = await Promise.all(data) - - return res.json(data) + return res.json(remoteStreams) }, "/stream/info": { middleware: ["withAuthentication"], @@ -238,8 +196,10 @@ export default class StreamingController extends Controller { "/streaming/:username": async (req, res) => { const { username } = req.params + const streamings = await this.methods.fetchStreams() + // search on this.streamings - const streaming = this.streamings.find((streaming) => streaming.username === username) + const streaming = streamings.find((streaming) => streaming.username === username) if (streaming) { return res.json(lodash.omit(streaming, FILTER_KEYS)) @@ -335,25 +295,22 @@ export default class StreamingController extends Controller { } }, "/streaming/publish": async (req, res) => { - const { app, stream, tcUrl } = req.body + const { stream } = req.body + + const streaming = await this.methods.generateStreamFromStreamkey(stream).catch((err) => { + console.error(err) - const streaming = await this.methods.pushToLocalList({ - app, - stream, - tcUrl - }).catch((err) => { res.status(500).json({ - code: 1, - error: err.message + error: `Cannot generate stream: ${err.message}`, }) - return false + return null }) if (streaming) { - global.wsInterface.io.emit(`streaming.new`, { - username: streaming.username, - }) + global.wsInterface.io.emit(`streaming.new`, streaming) + + global.wsInterface.io.emit(`streaming.new.${streaming.username}`, streaming) return res.json({ code: 0, @@ -364,21 +321,16 @@ export default class StreamingController extends Controller { "/streaming/unpublish": async (req, res) => { const { stream } = req.body - const streaming = await this.methods.removeFromLocalList({ - stream - }).catch((err) => { - res.status(500).json({ - code: 2, - status: err.message - }) + const streaming = await this.methods.generateStreamFromStreamkey(stream).catch((err) => { + console.error(err) - return false + return null }) if (streaming) { - global.wsInterface.io.emit(`streaming.end`, { - username: streaming.username, - }) + global.wsInterface.io.emit(`streaming.end`, streaming) + + global.wsInterface.io.emit(`streaming.end.${streaming.username}`, streaming) return res.json({ code: 0,