From 182040d37a563f5609d86f0c15ecd1b84b05202f Mon Sep 17 00:00:00 2001 From: srgooglo Date: Thu, 12 May 2022 14:58:09 +0200 Subject: [PATCH] stream pipe with `streamingUserspace` username resolver --- packages/streaming-server/src/index.js | 186 ++++++++++++++++++++++--- 1 file changed, 163 insertions(+), 23 deletions(-) diff --git a/packages/streaming-server/src/index.js b/packages/streaming-server/src/index.js index 7bf93d20..598fc007 100644 --- a/packages/streaming-server/src/index.js +++ b/packages/streaming-server/src/index.js @@ -5,6 +5,9 @@ import MediaServer from "node-media-server" import { SessionsManager, DbManager } from "./managers" import { getStreamingKeyFromStreamPath } from "./lib" +import axios from "axios" +import stream from "stream" + import { StreamingKey } from "./models" const HTTPServerConfig = { @@ -19,6 +22,10 @@ const MediaServerConfig = { ping: 30, ping_timeout: 60 }, + http: { + port: 1000, + allow_origin: '*' + }, // trans: { // ffmpeg: ffmpeg.path, // tasks: [ @@ -26,8 +33,6 @@ const MediaServerConfig = { // app: "live", // hls: true, // hlsFlags: "[hls_time=2:hls_list_size=3:hls_flags=delete_segments]", - // dash: true, - // dashFlags: "[f=dash:window_size=3:extra_window_size=5]" // } // ] // } @@ -66,40 +71,162 @@ class StreamingServer { } httpServerEndpoints = { + "/events/on-publish": { + method: "post", + fn: async (req, res) => { + req.body = Buffer.from(req.body).toString() + + // decode url-encoded body + req.body = req.body.split("&").reduce((acc, cur) => { + const [key, value] = cur.split("=") + acc[key] = value + + return acc + }, {}) + + const streamingKey = req.body.name + + const streamingUserspace = await StreamingKey.findOne({ + key: streamingKey + }) + + if (!streamingUserspace) { + return res.status(403).send("Invalid stream key") + } + + this.Sessions.publishStream({ + user_id: streamingUserspace.user_id, + stream_key: streamingKey + }) + + return res.send("OK") + } + }, + "/events/on-publish-done": { + method: "post", + fn: async (req, res) => { + req.body = Buffer.from(req.body).toString() + + // decode url-encoded body + req.body = req.body.split("&").reduce((acc, cur) => { + const [key, value] = cur.split("=") + acc[key] = value + + return acc + }, {}) + + const streamingKey = req.body.name + + const streamingUserspace = await StreamingKey.findOne({ + key: streamingKey + }) + + if (!streamingUserspace) { + return res.status(403).send("Invalid stream key") + } + + this.Sessions.unpublishStream(streamingKey) + + return res.send("OK") + } + }, "/streams": { method: "get", fn: async (req, res) => { - return res.json(this.Sessions.publicStreams) + if (req.query?.user_id) { + const streams = await this.Sessions.getStreamsByUserId(req.query.user_id) + + return res.json(streams) + } + + return res.json(this.Sessions.getPublicStreams()) + } + }, + "/stream/:mode/:username": { + method: "get", + fn: async (req, res) => { + const { username, mode = "flv" } = req.params + + const streamSession = this.Sessions.publicStreams.find(stream => { + if (stream.username === username) { + return stream + } + }) + + if (!streamSession) { + return res.status(404).json({ + error: "Stream not found" + }) + } + + const streamKey = streamSession.stream_key + + switch (mode) { + case "flv": { + const streamingFLVUri = `http://localhost:${MediaServerConfig.http.port}/live/${streamKey}.flv` + + // create a stream pipe response using media server api with axios + const request = await axios.get(streamingFLVUri, { + responseType: "stream" + }) + + // create a buffer stream from the request + const bufferStream = request.data.pipe(new stream.PassThrough()) + + // set header for stream response + res.setHeader("Content-Type", "video/x-flv") + + // pipe the buffer stream to the response + bufferStream.on("data", (chunk) => { + res.write(chunk) + }) + + break; + } + + case "hls": { + const streamingHLSUri = `http://localhost:${MediaServerConfig.http.port}/live/${streamKey}.m3u8` + + // create a stream pipe response using media server api with axios + const request = await axios.get(streamingHLSUri, { + responseType: "stream" + }) + + // create a buffer stream from the request + const bufferStream = request.data.pipe(new stream.PassThrough()) + + // set header for stream response + res.setHeader("Content-Type", "application/x-mpegURL") + + // pipe the buffer stream to the response + bufferStream.on("data", (chunk) => { + res.write(chunk) + }) + + break; + } + + default: { + return res.status(400).json({ + error: "Stream mode not supported" + }) + } + } } } } mediaServerEvents = { - preConnect: async (id, args) => { - // this event is fired after client is connected - // but session is not created yet & not ready to publish + prePublish: async (id, StreamPath, args) => { + // this event is fired before client is published + // here must be some validation (as key validation) // get session const session = this.IMediaServer.getSession(id) // create a userspaced session for the client with containing session this.Sessions.newSession(id, session) - }, - postConnect: async (id, args) => { - // this event is fired after client is connected and session is created - // and is already published - }, - doneConnect: async (id, args) => { - // this event is fired when client has ended the connection - // stop the session - this.Sessions.removeSession(id) - - this.Sessions.unpublishStream(id) - }, - prePublish: async (id, StreamPath, args) => { - // this event is fired before client is published - // here must be some validation (as key validation) const streamingKey = getStreamingKeyFromStreamPath(StreamPath) const streamingUserspace = await StreamingKey.findOne({ @@ -114,14 +241,27 @@ class StreamingServer { this.Sessions.publishStream({ id, user_id: streamingUserspace.user_id, + username: streamingUserspace.username, + stream_key: streamingKey }) + }, + donePublish: async (id, StreamPath, args) => { + // this event is fired when client has ended the connection + + // stop the session + this.Sessions.removeSession(id) + + const streamingKey = getStreamingKeyFromStreamPath(StreamPath) + + this.Sessions.unpublishStream(streamingKey) } } initialize = async () => { await this.Db.connect() - this.IMediaServer.run() - this.IHTTPServer.initialize() + + await this.IHTTPServer.initialize() + await this.IMediaServer.run() } }