mirror of
https://github.com/ragestudio/comty.git
synced 2025-06-11 03:24:16 +00:00
improve stream fetch
This commit is contained in:
parent
d500f60665
commit
e438d9adb8
@ -6,17 +6,14 @@ import axios from "axios"
|
|||||||
import { Schematized } from "../../lib"
|
import { Schematized } from "../../lib"
|
||||||
import { User, StreamingKey, StreamingInfo, StreamingCategory } from "../../models"
|
import { User, StreamingKey, StreamingInfo, StreamingCategory } from "../../models"
|
||||||
|
|
||||||
const streamingIngestServer = process.env.STREAMING_INGEST_SERVER
|
const streamingIngestServer = process.env.STREAMING_INGEST_SERVER ?? ""
|
||||||
const streamingServerAPIAddress = process.env.STREAMING_API_SERVER
|
const streamingServerAPIAddress = process.env.STREAMING_API_SERVER ?? ""
|
||||||
const streamingServerAPIProtocol = streamingServerAPIAddress.startsWith("https") ? "https" : "http"
|
|
||||||
|
|
||||||
const streamingServerAPIUri = `${streamingServerAPIProtocol}://${streamingServerAPIAddress.split("://")[1]}`
|
const streamingServerAPIUri = `${streamingServerAPIAddress.startsWith("https") ? "https" : "http"}://${streamingServerAPIAddress.split("://")[1]}`
|
||||||
|
|
||||||
const FILTER_KEYS = ["stream"]
|
const FILTER_KEYS = ["stream"]
|
||||||
|
|
||||||
export default class StreamingController extends Controller {
|
export default class StreamingController extends Controller {
|
||||||
streamings = []
|
|
||||||
|
|
||||||
methods = {
|
methods = {
|
||||||
genereteKey: async (user_id) => {
|
genereteKey: async (user_id) => {
|
||||||
// this will generate a new key for the user
|
// this will generate a new key for the user
|
||||||
@ -35,78 +32,61 @@ export default class StreamingController extends Controller {
|
|||||||
|
|
||||||
return streamingKey
|
return streamingKey
|
||||||
},
|
},
|
||||||
regenerateStreamingList: async () => {
|
fetchStreams: async () => {
|
||||||
// fetch all streams from api
|
// fetch all streams from api
|
||||||
let streams = await axios.get(`${streamingServerAPIUri}/api/v1/streams`).catch((err) => {
|
let { data } = await axios.get(`${streamingServerAPIUri}/api/v1/streams`).catch((err) => {
|
||||||
console.log(err)
|
console.error(err)
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
|
|
||||||
if (streams) {
|
let streamings = []
|
||||||
streams = streams.data.streams
|
|
||||||
|
|
||||||
// FIXME: this method is not totally async
|
if (!data) return streamings
|
||||||
streams.forEach((stream) => {
|
|
||||||
// check if the stream is already in the list
|
|
||||||
const streamInList = this.streamings.find((s) => s.stream === stream.name)
|
|
||||||
|
|
||||||
if (!streamInList) {
|
streamings = data.streams
|
||||||
// if not, add it
|
|
||||||
this.methods.pushToLocalList({
|
streamings = streamings.map(async (stream) => {
|
||||||
stream: stream.name,
|
stream = await this.methods.generateStreamFromStreamkey(stream.name)
|
||||||
app: stream.app,
|
|
||||||
}).catch((err) => {
|
let info = await StreamingInfo.findOne({
|
||||||
// sorry for you
|
user_id: stream.user_id
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
|
||||||
},
|
|
||||||
pushToLocalList: async (payload) => {
|
|
||||||
const { stream, app } = payload
|
|
||||||
|
|
||||||
const username = app.split("/")[1]
|
if (info) {
|
||||||
const user_id = await User.findOne({ username }).then((user) => user._id)
|
stream.info = info.toObject()
|
||||||
|
|
||||||
const streamingKey = await StreamingKey.findOne({
|
stream.info.category = await StreamingCategory.findOne({
|
||||||
key: stream
|
key: stream.info.category
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream
|
||||||
})
|
})
|
||||||
|
|
||||||
if (!streamingKey) {
|
streamings = await Promise.all(streamings)
|
||||||
throw new Error("Invalid streaming key")
|
|
||||||
}
|
|
||||||
|
|
||||||
if (username !== streamingKey.username) {
|
return streamings.map((stream) => {
|
||||||
throw new Error("Invalid streaming key for this username")
|
return lodash.omit(stream, FILTER_KEYS)
|
||||||
}
|
})
|
||||||
|
},
|
||||||
|
generateStreamFromStreamkey: async (streamKey) => {
|
||||||
|
// generate a stream from a streamkey
|
||||||
|
const streamingKey = await StreamingKey.findOne({
|
||||||
|
key: streamKey
|
||||||
|
})
|
||||||
|
|
||||||
|
if (!streamingKey) return false
|
||||||
|
|
||||||
const streaming = {
|
const streaming = {
|
||||||
stream,
|
user_id: streamingKey.user_id,
|
||||||
user_id: user_id.toString(),
|
|
||||||
username: streamingKey.username,
|
username: streamingKey.username,
|
||||||
sources: {
|
sources: {
|
||||||
rtmp: `${streamingIngestServer}/live/${username}`,
|
rtmp: `${streamingIngestServer}/live/${streamingKey.username}`,
|
||||||
hls: `${streamingServerAPIAddress}/live/${username}/src.m3u8`,
|
hls: `${streamingServerAPIAddress}/live/${streamingKey.username}/src.m3u8`,
|
||||||
flv: `${streamingServerAPIAddress}/live/${username}/src.flv`,
|
flv: `${streamingServerAPIAddress}/live/${streamingKey.username}/src.flv`,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.streamings.push(streaming)
|
|
||||||
|
|
||||||
return streaming
|
|
||||||
},
|
|
||||||
removeFromLocalList: async (payload) => {
|
|
||||||
const { stream } = payload
|
|
||||||
|
|
||||||
// remove from streamings array
|
|
||||||
const streaming = this.streamings.find((streaming) => streaming.stream === stream)
|
|
||||||
|
|
||||||
if (!streaming) {
|
|
||||||
throw new Error("Stream not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
this.streamings = this.streamings.filter((streaming) => streaming.stream !== stream)
|
|
||||||
|
|
||||||
return streaming
|
return streaming
|
||||||
},
|
},
|
||||||
handleInfoUpdate: async (payload) => {
|
handleInfoUpdate: async (payload) => {
|
||||||
@ -154,31 +134,9 @@ export default class StreamingController extends Controller {
|
|||||||
return res.json(categories)
|
return res.json(categories)
|
||||||
},
|
},
|
||||||
"/streams": async (req, res) => {
|
"/streams": async (req, res) => {
|
||||||
await this.methods.regenerateStreamingList()
|
const remoteStreams = await this.methods.fetchStreams()
|
||||||
|
|
||||||
let data = this.streamings.map((stream) => {
|
return res.json(remoteStreams)
|
||||||
return lodash.omit(stream, FILTER_KEYS)
|
|
||||||
})
|
|
||||||
|
|
||||||
data = data.map(async (stream) => {
|
|
||||||
let info = await StreamingInfo.findOne({
|
|
||||||
user_id: stream.user_id
|
|
||||||
})
|
|
||||||
|
|
||||||
if (info) {
|
|
||||||
stream.info = info.toObject()
|
|
||||||
|
|
||||||
stream.info.category = await StreamingCategory.findOne({
|
|
||||||
key: stream.info.category
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return stream
|
|
||||||
})
|
|
||||||
|
|
||||||
data = await Promise.all(data)
|
|
||||||
|
|
||||||
return res.json(data)
|
|
||||||
},
|
},
|
||||||
"/stream/info": {
|
"/stream/info": {
|
||||||
middleware: ["withAuthentication"],
|
middleware: ["withAuthentication"],
|
||||||
@ -238,8 +196,10 @@ export default class StreamingController extends Controller {
|
|||||||
"/streaming/:username": async (req, res) => {
|
"/streaming/:username": async (req, res) => {
|
||||||
const { username } = req.params
|
const { username } = req.params
|
||||||
|
|
||||||
|
const streamings = await this.methods.fetchStreams()
|
||||||
|
|
||||||
// search on this.streamings
|
// search on this.streamings
|
||||||
const streaming = this.streamings.find((streaming) => streaming.username === username)
|
const streaming = streamings.find((streaming) => streaming.username === username)
|
||||||
|
|
||||||
if (streaming) {
|
if (streaming) {
|
||||||
return res.json(lodash.omit(streaming, FILTER_KEYS))
|
return res.json(lodash.omit(streaming, FILTER_KEYS))
|
||||||
@ -335,25 +295,22 @@ export default class StreamingController extends Controller {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"/streaming/publish": async (req, res) => {
|
"/streaming/publish": async (req, res) => {
|
||||||
const { app, stream, tcUrl } = req.body
|
const { stream } = req.body
|
||||||
|
|
||||||
|
const streaming = await this.methods.generateStreamFromStreamkey(stream).catch((err) => {
|
||||||
|
console.error(err)
|
||||||
|
|
||||||
const streaming = await this.methods.pushToLocalList({
|
|
||||||
app,
|
|
||||||
stream,
|
|
||||||
tcUrl
|
|
||||||
}).catch((err) => {
|
|
||||||
res.status(500).json({
|
res.status(500).json({
|
||||||
code: 1,
|
error: `Cannot generate stream: ${err.message}`,
|
||||||
error: err.message
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return false
|
return null
|
||||||
})
|
})
|
||||||
|
|
||||||
if (streaming) {
|
if (streaming) {
|
||||||
global.wsInterface.io.emit(`streaming.new`, {
|
global.wsInterface.io.emit(`streaming.new`, streaming)
|
||||||
username: streaming.username,
|
|
||||||
})
|
global.wsInterface.io.emit(`streaming.new.${streaming.username}`, streaming)
|
||||||
|
|
||||||
return res.json({
|
return res.json({
|
||||||
code: 0,
|
code: 0,
|
||||||
@ -364,21 +321,16 @@ export default class StreamingController extends Controller {
|
|||||||
"/streaming/unpublish": async (req, res) => {
|
"/streaming/unpublish": async (req, res) => {
|
||||||
const { stream } = req.body
|
const { stream } = req.body
|
||||||
|
|
||||||
const streaming = await this.methods.removeFromLocalList({
|
const streaming = await this.methods.generateStreamFromStreamkey(stream).catch((err) => {
|
||||||
stream
|
console.error(err)
|
||||||
}).catch((err) => {
|
|
||||||
res.status(500).json({
|
|
||||||
code: 2,
|
|
||||||
status: err.message
|
|
||||||
})
|
|
||||||
|
|
||||||
return false
|
return null
|
||||||
})
|
})
|
||||||
|
|
||||||
if (streaming) {
|
if (streaming) {
|
||||||
global.wsInterface.io.emit(`streaming.end`, {
|
global.wsInterface.io.emit(`streaming.end`, streaming)
|
||||||
username: streaming.username,
|
|
||||||
})
|
global.wsInterface.io.emit(`streaming.end.${streaming.username}`, streaming)
|
||||||
|
|
||||||
return res.json({
|
return res.json({
|
||||||
code: 0,
|
code: 0,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user