stream pipe with streamingUserspace username resolver

This commit is contained in:
srgooglo 2022-05-12 14:58:09 +02:00
parent a86918d3b9
commit 182040d37a

View File

@ -5,6 +5,9 @@ import MediaServer from "node-media-server"
import { SessionsManager, DbManager } from "./managers"
import { getStreamingKeyFromStreamPath } from "./lib"
import axios from "axios"
import stream from "stream"
import { StreamingKey } from "./models"
const HTTPServerConfig = {
@ -19,6 +22,10 @@ const MediaServerConfig = {
ping: 30,
ping_timeout: 60
},
http: {
port: 1000,
allow_origin: '*'
},
// trans: {
// ffmpeg: ffmpeg.path,
// tasks: [
@ -26,8 +33,6 @@ const MediaServerConfig = {
// app: "live",
// hls: true,
// hlsFlags: "[hls_time=2:hls_list_size=3:hls_flags=delete_segments]",
// dash: true,
// dashFlags: "[f=dash:window_size=3:extra_window_size=5]"
// }
// ]
// }
@ -66,40 +71,162 @@ class StreamingServer {
}
httpServerEndpoints = {
"/events/on-publish": {
method: "post",
fn: async (req, res) => {
req.body = Buffer.from(req.body).toString()
// decode url-encoded body
req.body = req.body.split("&").reduce((acc, cur) => {
const [key, value] = cur.split("=")
acc[key] = value
return acc
}, {})
const streamingKey = req.body.name
const streamingUserspace = await StreamingKey.findOne({
key: streamingKey
})
if (!streamingUserspace) {
return res.status(403).send("Invalid stream key")
}
this.Sessions.publishStream({
user_id: streamingUserspace.user_id,
stream_key: streamingKey
})
return res.send("OK")
}
},
"/events/on-publish-done": {
method: "post",
fn: async (req, res) => {
req.body = Buffer.from(req.body).toString()
// decode url-encoded body
req.body = req.body.split("&").reduce((acc, cur) => {
const [key, value] = cur.split("=")
acc[key] = value
return acc
}, {})
const streamingKey = req.body.name
const streamingUserspace = await StreamingKey.findOne({
key: streamingKey
})
if (!streamingUserspace) {
return res.status(403).send("Invalid stream key")
}
this.Sessions.unpublishStream(streamingKey)
return res.send("OK")
}
},
"/streams": {
method: "get",
fn: async (req, res) => {
return res.json(this.Sessions.publicStreams)
if (req.query?.user_id) {
const streams = await this.Sessions.getStreamsByUserId(req.query.user_id)
return res.json(streams)
}
return res.json(this.Sessions.getPublicStreams())
}
},
"/stream/:mode/:username": {
method: "get",
fn: async (req, res) => {
const { username, mode = "flv" } = req.params
const streamSession = this.Sessions.publicStreams.find(stream => {
if (stream.username === username) {
return stream
}
})
if (!streamSession) {
return res.status(404).json({
error: "Stream not found"
})
}
const streamKey = streamSession.stream_key
switch (mode) {
case "flv": {
const streamingFLVUri = `http://localhost:${MediaServerConfig.http.port}/live/${streamKey}.flv`
// create a stream pipe response using media server api with axios
const request = await axios.get(streamingFLVUri, {
responseType: "stream"
})
// create a buffer stream from the request
const bufferStream = request.data.pipe(new stream.PassThrough())
// set header for stream response
res.setHeader("Content-Type", "video/x-flv")
// pipe the buffer stream to the response
bufferStream.on("data", (chunk) => {
res.write(chunk)
})
break;
}
case "hls": {
const streamingHLSUri = `http://localhost:${MediaServerConfig.http.port}/live/${streamKey}.m3u8`
// create a stream pipe response using media server api with axios
const request = await axios.get(streamingHLSUri, {
responseType: "stream"
})
// create a buffer stream from the request
const bufferStream = request.data.pipe(new stream.PassThrough())
// set header for stream response
res.setHeader("Content-Type", "application/x-mpegURL")
// pipe the buffer stream to the response
bufferStream.on("data", (chunk) => {
res.write(chunk)
})
break;
}
default: {
return res.status(400).json({
error: "Stream mode not supported"
})
}
}
}
}
}
mediaServerEvents = {
preConnect: async (id, args) => {
// this event is fired after client is connected
// but session is not created yet & not ready to publish
prePublish: async (id, StreamPath, args) => {
// this event is fired before client is published
// here must be some validation (as key validation)
// get session
const session = this.IMediaServer.getSession(id)
// create a userspaced session for the client with containing session
this.Sessions.newSession(id, session)
},
postConnect: async (id, args) => {
// this event is fired after client is connected and session is created
// and is already published
},
doneConnect: async (id, args) => {
// this event is fired when client has ended the connection
// stop the session
this.Sessions.removeSession(id)
this.Sessions.unpublishStream(id)
},
prePublish: async (id, StreamPath, args) => {
// this event is fired before client is published
// here must be some validation (as key validation)
const streamingKey = getStreamingKeyFromStreamPath(StreamPath)
const streamingUserspace = await StreamingKey.findOne({
@ -114,14 +241,27 @@ class StreamingServer {
this.Sessions.publishStream({
id,
user_id: streamingUserspace.user_id,
username: streamingUserspace.username,
stream_key: streamingKey
})
},
donePublish: async (id, StreamPath, args) => {
// this event is fired when client has ended the connection
// stop the session
this.Sessions.removeSession(id)
const streamingKey = getStreamingKeyFromStreamPath(StreamPath)
this.Sessions.unpublishStream(streamingKey)
}
}
initialize = async () => {
await this.Db.connect()
this.IMediaServer.run()
this.IHTTPServer.initialize()
await this.IHTTPServer.initialize()
await this.IMediaServer.run()
}
}