Merge pull request #57 from ragestudio/streaming-server

Streaming server
This commit is contained in:
srgooglo 2022-05-12 19:32:03 +02:00 committed by GitHub
commit dcc3eb2151
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 1315 additions and 149 deletions

View File

@ -12,11 +12,10 @@ export default {
alt: "/logo_alt.svg",
full: "/logo_full.svg",
},
api: {
address: defaultRemotesOrigins.http_api,//process.env.NODE_ENV !== "production" ? `http://${window.location.hostname}:3000` : defaultRemotesOrigins.http_api,
},
ws: {
address: defaultRemotesOrigins.ws_api, //process.env.NODE_ENV !== "production" ? `ws://${window.location.hostname}:3001` : defaultRemotesOrigins.ws_api,
remotes: {
mainApi: defaultRemotesOrigins.main_api, //process.env.NODE_ENV !== "production" ? `http://${window.location.hostname}:3000` : defaultRemotesOrigins.http_api
streamingApi: defaultRemotesOrigins.streaming_api, //process.env.NODE_ENV !== "production" ? `ws://${window.location.hostname}:3001` : defaultRemotesOrigins.ws_api
websocketApi: defaultRemotesOrigins.ws_api,
},
app: {
title: packagejson.name,
@ -41,6 +40,6 @@ export default {
name: "Español"
}
],
defaultLocale: "es",
defaultLocale: "en",
}
}

View File

@ -1,4 +1,5 @@
{
"themeVariant": "light",
"forceMobileMode": false,
"notifications_sound": true,
"notifications_vibrate": true,
@ -13,6 +14,7 @@
"main",
"explore",
"saved",
"marketplace"
"marketplace",
"streams",
]
}

View File

@ -18,5 +18,15 @@
"id": "marketplace",
"title": "Marketplace",
"icon": "Package"
},
{
"id": "streams",
"title": "Streams",
"icon": "Tv"
},
{
"id": "streaming_control",
"title": "Streaming Control",
"icon": "Video"
}
]

View File

@ -146,14 +146,17 @@ export default [
"experimental": true
},
{
"experimental": true,
"id": "darkMode",
"storaged": true,
"group": "aspect",
"type": "Switch",
"icon": "Moon",
"title": "Dark mode",
"emitEvent": "darkMode",
"experimental": true
"emitEvent": "theme.applyVariant",
"emissionValueUpdate": (value) => {
return value ? "dark" : "light"
},
},
{
"id": "primaryColor",

View File

@ -74,6 +74,7 @@
"react-router-config": "^5.1.1",
"react-router-dom": "6.2.1",
"react-virtualized": "^9.22.3",
"rxjs": "^7.5.5",
"store": "^2.0.12",
"styled-components": "^5.3.3",
"vite-ssr": "0.15.0"

View File

@ -87,8 +87,8 @@ export default class ApiExtension extends Extension {
}
return new Bridge({
origin: config.api.address,
wsOrigin: config.ws.address,
origin: config.remotes.mainApi,
wsOrigin: config.remotes.websocketApi,
wsOptions: {
autoConnect: false,
},

View File

@ -1,6 +1,7 @@
import { Extension } from "evite"
import store from "store"
import defaultSettings from "schemas/defaultSettings.json"
import { Observable } from "rxjs"
export default class SettingsExtension extends Extension {
constructor(app, main) {
@ -52,6 +53,23 @@ export default class SettingsExtension extends Extension {
return this.settings[key]
}
withEvent = (listenEvent, defaultValue) => {
let value = defaultValue ?? this.settings[key] ?? false
const observable = new Observable((subscriber) => {
subscriber.next(value)
window.app.eventBus.on(listenEvent, (to) => {
value = to
subscriber.next(value)
})
})
return observable.subscribe((value) => {
return value
})
}
window = {
"settings": this
}

View File

@ -9,7 +9,6 @@ export default class ThemeExtension extends Extension {
this.themeManifestStorageKey = "theme"
this.modificationStorageKey = "themeModifications"
this.variantStorageKey = "themeVariation"
this.theme = null
@ -19,12 +18,9 @@ export default class ThemeExtension extends Extension {
initializers = [
async () => {
this.mainContext.eventBus.on("darkMode", (value) => {
if (value) {
this.applyVariant("dark")
} else {
this.applyVariant("light")
}
this.mainContext.eventBus.on("theme.applyVariant", (value) => {
this.applyVariant(value)
this.setVariant(value)
})
this.mainContext.eventBus.on("modifyTheme", (value) => {
this.update(value)
@ -95,11 +91,11 @@ export default class ThemeExtension extends Extension {
}
getStoragedVariant = () => {
return store.get(this.variantStorageKey)
return app.settings.get("themeVariant")
}
setVariant = (variationKey) => {
return store.set(this.variantStorageKey, variationKey)
return app.settings.set("themeVariant", variationKey)
}
setModifications = (modifications) => {
@ -142,7 +138,6 @@ export default class ThemeExtension extends Extension {
if (values) {
this.currentVariant = variant
this.update(values)
this.setVariant(variant)
}
}

View File

@ -27,5 +27,6 @@
opacity: 0;
height: 0 !important;
padding: 0 !important;
border: 0 !important;
}
}

View File

@ -12,7 +12,7 @@ const LayoutRenders = {
mobile: (props) => {
return <antd.Layout className={classnames("app_layout", ["mobile"])} style={{ height: "100%" }}>
<antd.Layout className="content_layout">
<antd.Layout.Content className="layout_page">
<antd.Layout.Content className={classnames("layout_page", ...props.layoutPageModesClassnames ?? [])}>
<div className={classnames("fade-transverse-active", { "fade-transverse-leave": props.isOnTransition })}>
{props.children}
</div>
@ -28,7 +28,7 @@ const LayoutRenders = {
<Sidebar user={props.user} />
<antd.Layout className="content_layout">
<Header />
<antd.Layout.Content className="layout_page">
<antd.Layout.Content className={classnames("layout_page", ...props.layoutPageModesClassnames ?? [])}>
<div className={classnames("fade-transverse-active", { "fade-transverse-leave": props.isOnTransition })}>
{props.children}
</div>
@ -43,6 +43,7 @@ export default class Layout extends React.Component {
state = {
layoutType: "default",
isOnTransition: false,
compactMode: false,
}
setLayout = (layout) => {
@ -62,6 +63,11 @@ export default class Layout extends React.Component {
window.app.eventBus.on("transitionDone", () => {
this.setState({ isOnTransition: false })
})
window.app.eventBus.on("toogleCompactMode", (to) => {
this.setState({
compactMode: to ?? !this.state.compactMode,
})
})
if (window.app.settings.get("forceMobileMode") || window.app.isAppCapacitor() || Math.min(window.screen.width, window.screen.height) < 768 || navigator.userAgent.indexOf("Mobi") > -1) {
window.isMobile = true
@ -85,6 +91,9 @@ export default class Layout extends React.Component {
const layoutComponentProps = {
...this.props,
...this.state,
layoutPageModesClassnames: [{
["noMargin"]: this.state.compactMode,
}]
}
if (LayoutRenders[this.state.layoutType]) {

View File

@ -0,0 +1,140 @@
import React from "react"
import * as antd from "antd"
import { Icons } from "components/Icons"
import "./index.less"
const StreamingKeyView = (props) => {
const [streamingKeyVisibility, setStreamingKeyVisibility] = React.useState(false)
const toogleVisibility = (to) => {
setStreamingKeyVisibility(to ?? !streamingKeyVisibility)
}
return <div className="streamingKeyString">
{streamingKeyVisibility ?
<>
<Icons.EyeOff onClick={() => toogleVisibility()} />
<h4>
{props.streamingKey ?? "No streaming key available"}
</h4>
</> :
<>
<Icons.Eye onClick={() => toogleVisibility()} />
Show key
</>
}
</div>
}
export default (props) => {
const [isConnected, setIsConnected] = React.useState(false)
const [targetServer, setTargetServer] = React.useState("No available server")
const [streamingKey, setStreamingKey] = React.useState(null)
const [serverTier, setServerTier] = React.useState(null)
const checkStreamingKey = async () => {
const result = await app.request.get.streamingKey().catch((error) => {
console.error(error)
antd.message.error(error.message)
return null
})
if (result) {
setStreamingKey(result.key)
}
}
const checkTagetServer = async () => {
const result = await app.request.get.targetStreamingServer()
if (result) {
const targetServer = `${result.protocol}://${result.address}:${result.port}/${result.space}`
setTargetServer(targetServer)
}
}
const regenerateStreamingKey = async () => {
antd.Modal.confirm({
title: "Regenerate streaming key",
content: "Are you sure you want to regenerate the streaming key? After this, all other generated keys will be deleted.",
onOk: async () => {
const result = await app.request.post.regenerateStreamingKey().catch((error) => {
console.error(error)
antd.message.error(error.message)
return null
})
if (result) {
setStreamingKey(result.key)
}
}
})
}
React.useEffect(() => {
checkStreamingKey()
checkTagetServer()
// TODO: Use UserTier controller to check streaming service tier
// by now, we just use a fixed value
setServerTier("basic")
}, [])
return <div className="streamingControlPanel">
<div>
<h2><Icons.MdSettingsInputAntenna /> Connection Status</h2>
<div>
<antd.Tag
color={isConnected ? "Blue" : "Red"}
icon={isConnected ? <Icons.MdOutlineVideocam /> : <Icons.MdOutlineVideocamOff />}
>
{isConnected ? "Connected" : "Disconnected"}
</antd.Tag>
</div>
</div>
<div>
<h2><Icons.Info /> Server info</h2>
<div className="info">
<div className="label">
<Icons.Server />
Server Address
</div>
<div className="value">
<h4>
{targetServer}
</h4>
</div>
</div>
<div className="info">
<div className="label">
<Icons.Key />
Streaming Key
</div>
<div className="value">
<StreamingKeyView streamingKey={streamingKey} />
</div>
<div>
<antd.Button onClick={() => regenerateStreamingKey()}>
<Icons.RefreshCw />
Regenerate
</antd.Button>
</div>
</div>
<div className="info">
<div className="label">
<Icons.MdSettingsInputSvideo />
Usage Tier
</div>
<div className="value">
<antd.Tag>
{serverTier}
</antd.Tag>
</div>
</div>
</div>
</div>
}

View File

@ -0,0 +1,36 @@
.streamingControlPanel {
display: inline-flex;
flex-direction: column;
.info {
display: flex;
flex-direction: column;
margin-bottom: 10px;
.label {
}
.value {
margin-left: 10px;
font-family: "DM Mono", monospace;
h4 {
// select all text
user-select: all;
margin: 0;
}
}
}
> div {
margin-bottom: 20px;
}
}
.streamingKeyString {
display: inline-flex;
flex-direction: row;
align-items: center;
}

View File

@ -1,8 +1,9 @@
import React from 'react'
import axios from "axios"
import React from "react"
import * as antd from "antd"
import { SelectableList, ActionsBar } from "components"
import "./index.less"
export default class Streams extends React.Component {
state = {
list: [],
@ -15,31 +16,62 @@ export default class Streams extends React.Component {
}
updateStreamsList = async () => {
const streams = await this.api.get.streams().catch(error => {
let streams = await this.api.get.streams().catch(error => {
console.error(error)
antd.message.error(error)
return false
})
this.setState({ list: streams })
if (streams && Array.isArray(streams)) {
// resolve user_id with user basic data
streams = streams.map(async (stream) => {
const userData = await this.api.get.user(undefined, { user_id: stream.user_id }).catch((error) => {
console.error(error)
antd.message.error(error)
return false
})
if (userData) {
stream.userData = userData
}
return stream
})
streams = await Promise.all(streams)
}
this.setState({ list: streams })
}
onClickItem = (item) => {
window.app.setLocation(`/streams/viewer?key=${item}`)
}
renderListItem = (stream) => {
stream.StreamPath = stream.StreamPath.replace(/^\/live\//, "")
return <div key={stream.id} onClick={() => this.onClickItem(stream.StreamPath)}>
<h1>@{stream.StreamPath} #{stream.id}</h1>
return <div
key={stream.id}
onClick={() => this.onClickItem(stream.username)}
className="streaming-item"
>
<div className="thumbnail">
<img src={stream.userData.avatar} alt={stream.userData.username} />
</div>
<div className="details">
<div className="title">
<h1>@{stream.userData.username}</h1>
<span>
#{stream.id}
</span>
</div>
</div>
</div>
}
render() {
return <div>
return <div className="streams">
<ActionsBar mode="float">
<div>
<antd.Button onClick={this.updateStreamsList}>Refresh</antd.Button>

View File

@ -0,0 +1,49 @@
.streams {
.selectableList_content {
display: flex;
flex-wrap: wrap;
.selectableList_item {
max-width: 20vw;
}
.streaming-item {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
padding: 10px;
.thumbnail {
width: 15vw;
height: 100px;
background-size: cover;
img {
width: 100%;
height: 100%;
}
}
.details {
.title {
display: inline-flex;
align-items: center;
h1{
font-size: 1.5em;
font-weight: bold;
margin-right: 10px;
margin-bottom: 0;
}
span {
font-size: 0.8em;
}
}
}
}
}
}

View File

@ -1,15 +1,25 @@
import React from 'react'
import React from "react"
import config from "config"
import * as antd from "antd"
import Plyr from 'plyr'
import Hls from 'hls.js'
import mpegts from 'mpegts.js'
import { Icons } from "components/Icons"
import moment from "moment"
import Plyr from "plyr"
import Hls from "hls.js"
import mpegts from "mpegts.js"
import "plyr/dist/plyr.css"
import "./index.less"
const streamsSource = "http://media.ragestudio.net/live"
const streamsSource = config.remotes.streamingApi
export default class StreamViewer extends React.Component {
state = {
userData: null,
streamInfo: null,
spectators: 0,
timeFromNow: "00:00:00",
player: null,
streamKey: null,
streamSource: null,
@ -22,18 +32,98 @@ export default class StreamViewer extends React.Component {
componentDidMount = async () => {
const query = new URLSearchParams(window.location.search)
const requested = query.get("key")
const requestedUsername = query.get("key")
const source = `${streamsSource}/${requested}`
const player = new Plyr('#player')
const source = `${streamsSource}/${requestedUsername}`
const player = new Plyr("#player", {
autoplay: true,
controls: ["play", "mute", "volume", "fullscreen", "options", "settings"],
})
await this.setState({
player,
streamKey: requested,
streamKey: requestedUsername,
streamSource: source,
})
await this.loadWithProtocol[this.state.loadedProtocol]()
// make the interface a bit confortable for a video player
app.ThemeController.applyVariant("dark")
app.eventBus.emit("toogleCompactMode", true)
app.SidebarController.toogleVisible(false)
app.HeaderController.toogleVisible(false)
// fetch user info in the background
this.gatherUserInfo()
// fetch stream info in the background
// await for it
await this.gatherStreamInfo()
// create timer
if (this.state.streamInfo.connectCreated) {
this.createTimerCounter()
}
}
componentWillUnmount = () => {
app.ThemeController.applyVariant(app.settings.get("themeVariant"))
app.eventBus.emit("toogleCompactMode", false)
app.SidebarController.toogleVisible(true)
app.HeaderController.toogleVisible(true)
app.HeaderController.toogleVisible(true)
if (this.timerCounterInterval) {
this.timerCounterInterval = clearInterval(this.timerCounterInterval)
}
}
gatherStreamInfo = async () => {
const result = await app.request.get.streamInfoFromUsername(undefined, {
username: this.state.streamKey,
}).catch((error) => {
console.error(error)
antd.message.error(error.message)
return false
})
if (result) {
this.setState({
streamInfo: result,
})
}
}
gatherUserInfo = async () => {
const result = await app.request.get.user(undefined, {
username: this.state.streamKey,
}).catch((error) => {
console.error(error)
antd.message.error(error.message)
return false
})
if (result) {
this.setState({
userData: result,
})
}
}
createTimerCounter = () => {
this.timerCounterInterval = setInterval(() => {
const secondsFromNow = moment().diff(moment(this.state.streamInfo.connectCreated), "seconds")
// calculate hours minutes and seconds
const hours = Math.floor(secondsFromNow / 3600)
const minutes = Math.floor((secondsFromNow - hours * 3600) / 60)
const seconds = secondsFromNow - hours * 3600 - minutes * 60
this.setState({
timeFromNow: `${hours}:${minutes}:${seconds}`,
})
}, 1000)
}
updateQuality = (newQuality) => {
@ -63,7 +153,7 @@ export default class StreamViewer extends React.Component {
loadWithProtocol = {
hls: () => {
const source = `${this.state.streamSource}.m3u8`
const source = `${streamsSource}/stream/hls/${this.state.streamKey}`
const hls = new Hls()
hls.loadSource(source)
@ -72,9 +162,13 @@ export default class StreamViewer extends React.Component {
this.setState({ protocolInstance: hls, loadedProtocol: "hls" })
},
flv: () => {
const source = `${this.state.streamSource}.flv`
const source = `${streamsSource}/stream/flv/${this.state.streamKey}`
const instance = mpegts.createPlayer({ type: 'flv', url: source, isLive: true })
const instance = mpegts.createPlayer({
type: "flv",
url: source,
isLive: true
})
instance.attachMediaElement(this.videoPlayerRef.current)
instance.load()
@ -85,15 +179,39 @@ export default class StreamViewer extends React.Component {
}
render() {
return <div>
<antd.Select
onChange={(value) => this.switchProtocol(value)}
value={this.state.loadedProtocol}
>
<antd.Select.Option value="hls">HLS</antd.Select.Option>
<antd.Select.Option value="flv">FLV</antd.Select.Option>
</antd.Select>
return <div className="stream">
<video ref={this.videoPlayerRef} id="player" />
<div className="panel">
<div className="info">
<div className="title">
<div>
<antd.Avatar
shape="square"
src={this.state.userData?.avatar}
/>
</div>
<div>
<h2>{this.state.userData?.username}</h2>
</div>
</div>
<div id="spectatorCount">
<Icons.Eye />
{this.state.spectators}
</div>
<div id="timeCount">
<Icons.Clock />
{this.state.timeFromNow}
</div>
</div>
<div className="chatbox">
{/* TODO: Use chatbox component and join to stream channel using username */}
<antd.Result>
<h1>
Cannot connect with chat server
</h1>
</antd.Result>
</div>
</div>
</div>
}
}

View File

@ -0,0 +1,92 @@
.plyr__controls {
width: 100%;
display: inline-flex;
//justify-content: space-between;
}
.stream {
display: flex;
flex-direction: row;
align-items: center;
justify-content: center;
height: 100vh;
width: 100vw;
color: var(--background-color-contrast);
h1,
h2,
h3,
h4,
h5,
span,
p {
color: var(--background-color-contrast);
}
.panel {
display: flex;
flex-direction: column;
height: 100vh;
width: 20vw;
.info {
display: flex;
flex-direction: column;
justify-content: flex-start;
align-items: flex-start;
width: 100%;
height: 10vh;
padding: 10px;
backdrop-filter: 20px;
h1,
h2,
h3,
h4,
h5 {
margin: 0;
}
>div {
display: flex;
flex-direction: row;
align-items: center;
justify-content: center;
height: fit-content;
margin-bottom: 8px;
>div {
margin-right: 8px;
}
}
}
.chatbox {
width: 20vw;
padding: 20px;
height: 100vh;
}
#spectatorCount {
font-size: 0.8em;
}
#timeCount {
font-size: 0.8em;
}
}
.plyr {
border-radius: 0 4px 4px 0;
width: 80vw;
height: 100vh;
}
}

View File

@ -136,6 +136,12 @@ body {
overflow-x: hidden;
overflow-y: overlay;
transition: all 150ms ease-in-out;
&.noMargin {
margin: 0;
}
}
@media (max-width: 768px) {
@ -319,3 +325,27 @@ body {
opacity: 0;
}
}
.ant-result {
.ant-result-content {
display: inline-flex;
align-items: center;
justify-content: center;
text-align: center;
padding: 10px;
background-color: var(--background-color-accent);
color: var(--background-color-primary);
h1,
h2,
h3,
h4,
h5,
p,
span {
margin: 0;
}
}
}

View File

@ -2,11 +2,17 @@ div {
color: var(--text-color);
}
h1, h2, h3, h4, h5, h6 {
h1,
h2,
h3,
h4,
h5,
h6 {
color: var(--header-text-color);
}
a, p {
a,
p {
color: var(--text-color);
}
@ -22,7 +28,9 @@ svg:not(.ant-tag *) {
color: var(--svg-color);
}
input, .ant-input-affix-wrapper, .ant-input {
input,
.ant-input-affix-wrapper,
.ant-input {
color: var(--text-color) !important;
background-color: var(--background-color-accent);
}
@ -41,7 +49,10 @@ tr {
background-color: var(--background-color-accent) !important;
}
.ant-table, .ant-table-content, .ant-table-thead, .ant-table-cell {
.ant-table,
.ant-table-content,
.ant-table-thead,
.ant-table-cell {
background-color: var(--background-color-accent);
}

View File

@ -0,0 +1,139 @@
import { Controller } from "linebridge/dist/server"
import { User, StreamingKey } from "../../models"
import { nanoid } from "nanoid"
import axios from "axios"
const streamingMediaServer = process.env.streamingMediaServer ?? "media.ragestudio.net"
const streamingServerAPIAddress = process.env.streamingServerAPIAddress ?? "media.ragestudio.net"
const streamingServerAPIPort = process.env.streamingServerAPIPort ?? 3002
const streamingServerAPIProtocol = process.env.streamingServerAPIProtocol ?? "http"
const streamingServerAPIUri = `${streamingServerAPIProtocol}://${streamingServerAPIAddress}:${streamingServerAPIPort}`
export default class StreamingController extends Controller {
static useMiddlewares = ["withAuthentication"]
methods = {
genereteKey: async (user_id) => {
// this will generate a new key for the user
// if the user already has a key, it will be regenerated
// get username from user_id
const userData = await User.findOne({ user_id: user_id })
const streamingKey = new StreamingKey({
user_id,
username: userData.username,
key: nanoid()
})
await streamingKey.save()
return streamingKey
}
}
get = {
"/stream_info_from_username": async (req, res) => {
const { username } = req.query
const userspace = await StreamingKey.findOne({ username })
if (!userspace) {
return res.status(403).json({
error: "This username has not a streaming key"
})
}
// TODO: meanwhile linebridge remote linkers are in development we gonna use this methods to fetch
const { data } = await axios.get(`${streamingServerAPIUri}/streams`, {
params: {
username: userspace.username
}
}).catch((error) => {
res.status(500).json({
error: `Failed to fetch streams from [${streamingServerAPIAddress}]: ${error.message}`
})
return false
})
return res.json(data)
},
"/streams": async (req, res) => {
// TODO: meanwhile linebridge remote linkers are in development we gonna use this methods to fetch
const { data } = await axios.get(`${streamingServerAPIUri}/streams`).catch((error) => {
res.status(500).json({
error: `Failed to fetch streams from [${streamingServerAPIAddress}]: ${error.message}`
})
return false
})
if (data) {
return res.json(data)
}
},
"/target_streaming_server": async (req, res) => {
// TODO: resolve an available server
// for now we just return the only one should be online
return res.json({
protocol: "rtmp",
port: "1935",
space: "live",
address: streamingMediaServer,
})
},
"/streaming_key": async (req, res) => {
let streamingKey = await StreamingKey.findOne({
user_id: req.user._id.toString()
})
if (!streamingKey) {
const newKey = await this.methods.genereteKey(req.user._id.toString()).catch(err => {
res.status(500).json({
error: `Cannot generate a new key: ${err.message}`,
})
return false
})
if (!newKey) {
return false
}
return res.json(newKey)
} else {
return res.json(streamingKey)
}
}
}
post = {
"/regenerate_streaming_key": async (req, res) => {
// check if the user already has a key
let streamingKey = await StreamingKey.findOne({
user_id: req.user._id.toString()
})
// if exists, delete it
if (streamingKey) {
await streamingKey.remove()
}
// generate a new key
const newKey = await this.methods.genereteKey(req.user._id.toString()).catch(err => {
res.status(500).json({
error: `Cannot generate a new key: ${err.message}`,
})
return false
})
if (!newKey) {
return false
}
return res.json(newKey)
}
}
}

View File

@ -5,6 +5,7 @@ import { default as UserController } from "./UserController"
import { default as FilesController } from "./FilesController"
import { default as PublicController } from "./PublicController"
import { default as PostsController } from "./PostsController"
import { default as StreamingController } from "./StreamingController"
export default [
PostsController,
@ -14,4 +15,5 @@ export default [
SessionController,
UserController,
FilesController,
StreamingController,
]

View File

@ -28,5 +28,6 @@ export const Post = mongoose.model("Post", schemas.Post, "posts")
export const Comment = mongoose.model("Comment", schemas.Comment, "comments")
// streamings
export const StreamingKey = mongoose.model("StreamingKey", schemas.streamingKey, "streamingKeys")
// marketplace

View File

@ -6,3 +6,4 @@ export { default as Post } from "./post"
export { default as Comment } from "./comment"
export { default as UserFollow } from "./userFollow"
export { default as Badge } from "./badge"
export { default as streamingKey } from "./streamingKey"

View File

@ -0,0 +1,14 @@
export default {
username: {
type: String,
required: true,
},
user_id: {
type: String,
required: true,
},
key: {
type: String,
required: true,
}
}

View File

@ -0,0 +1,20 @@
{
"name": "@comty/streaming-server",
"author": "RageStudio",
"version": "0.1.0",
"main": "dist/index.js",
"scripts": {
"dev": "nodemon --ignore dist/ --exec corenode-node ./src/index.js"
},
"dependencies": {
"@ffmpeg-installer/ffmpeg": "^1.1.0",
"linebridge": "^0.11.13",
"lodash": "^4.17.21",
"mongoose": "^6.3.3",
"node-media-server": "^2.3.9"
},
"devDependencies": {
"cross-env": "^7.0.3",
"nodemon": "^2.0.15"
}
}

View File

@ -0,0 +1,307 @@
const ffmpeg = require("@ffmpeg-installer/ffmpeg")
import lodash from "lodash"
import { Server } from "linebridge/dist/server"
import MediaServer from "node-media-server"
import { SessionsManager, DbManager } from "./managers"
import { getStreamingKeyFromStreamPath } from "./lib"
import axios from "axios"
import stream from "stream"
import { StreamingKey } from "./models"
const HTTPServerConfig = {
port: 3002,
}
const MediaServerConfig = {
rtmp: {
port: 1935,
chunk_size: 60000,
gop_cache: true,
ping: 30,
ping_timeout: 60
},
http: {
port: 1000,
allow_origin: '*'
},
trans: {
ffmpeg: ffmpeg.path,
tasks: [
{
app: "live",
hls: true,
hlsFlags: "[hls_time=2:hls_list_size=3:hls_flags=delete_segments]",
}
]
}
}
const internalMediaServerURI = `http://127.0.0.1:${MediaServerConfig.http.port}`
class StreamingServer {
IHTTPServer = new Server(HTTPServerConfig)
IMediaServer = new MediaServer(MediaServerConfig)
Db = new DbManager()
Sessions = new SessionsManager()
constructor() {
this.registerMediaServerEvents()
this.registerHTTPServerEndpoints()
// fire initization
this.initialize()
}
registerMediaServerEvents = () => {
Object.keys(this.mediaServerEvents).forEach((eventName) => {
this.IMediaServer.on(eventName, this.mediaServerEvents[eventName])
})
}
registerHTTPServerEndpoints = () => {
Object.keys(this.httpServerEndpoints).forEach((route) => {
this.IHTTPServer.registerHTTPEndpoint({
route: route,
...this.httpServerEndpoints[route]
})
})
}
httpServerEndpoints = {
"/events/on-publish": {
method: "post",
fn: async (req, res) => {
req.body = Buffer.from(req.body).toString()
// decode url-encoded body
req.body = req.body.split("&").reduce((acc, cur) => {
const [key, value] = cur.split("=")
acc[key] = value
return acc
}, {})
const streamingKey = req.body.name
const streamingUserspace = await StreamingKey.findOne({
key: streamingKey
})
if (!streamingUserspace) {
return res.status(403).send("Invalid stream key")
}
this.Sessions.publishStream({
user_id: streamingUserspace.user_id,
stream_key: streamingKey
})
return res.send("OK")
}
},
"/events/on-publish-done": {
method: "post",
fn: async (req, res) => {
req.body = Buffer.from(req.body).toString()
// decode url-encoded body
req.body = req.body.split("&").reduce((acc, cur) => {
const [key, value] = cur.split("=")
acc[key] = value
return acc
}, {})
const streamingKey = req.body.name
const streamingUserspace = await StreamingKey.findOne({
key: streamingKey
})
if (!streamingUserspace) {
return res.status(403).send("Invalid stream key")
}
this.Sessions.unpublishStream(streamingKey)
return res.send("OK")
}
},
"/streams": {
method: "get",
fn: async (req, res) => {
let streams = []
if (req.query?.username) {
streams = await this.Sessions.getStreamsByUsername(req.query?.username)
} else {
streams = this.Sessions.getPublicStreams()
}
// retrieve streams details from internal media server api
let streamsListDetails = await axios.get(`${internalMediaServerURI}/api/streams`)
streamsListDetails = streamsListDetails.data.live ?? {}
// return only publisher details
streamsListDetails = Object.keys(streamsListDetails).map((streamKey) => {
return {
// filter unwanted properties
...lodash.omit(streamsListDetails[streamKey].publisher, ["stream", "ip"])
}
})
// reduce as an object
streamsListDetails = streamsListDetails.reduce((acc, cur) => {
acc[cur.clientId] = cur
return acc
}, {})
// merge with public streams
streams = streams.map((stream) => {
return {
...stream,
...streamsListDetails[stream.id]
}
})
// if username is provided, return only streams for that user
// is supposed to be allowed only one stream per user
if (req.query?.username) {
return res.json(streams[0])
}
return res.json(streams)
}
},
"/stream/:mode/:username": {
method: "get",
fn: async (req, res) => {
const { username, mode = "flv" } = req.params
const streamSession = this.Sessions.publicStreams.find(stream => {
if (stream.username === username) {
return stream
}
})
if (!streamSession) {
return res.status(404).json({
error: "Stream not found"
})
}
const streamKey = streamSession.stream_key
switch (mode) {
case "flv": {
const streamingFLVUri = `${internalMediaServerURI}/live/${streamKey}.flv`
// create a stream pipe response using media server api with axios
const request = await axios.get(streamingFLVUri, {
responseType: "stream"
})
// create a buffer stream from the request
const bufferStream = request.data.pipe(new stream.PassThrough())
// set header for stream response
res.setHeader("Content-Type", "video/x-flv")
// pipe the buffer stream to the response
bufferStream.on("data", (chunk) => {
res.write(chunk)
})
break;
}
case "hls": {
const streamingHLSUri = `${internalMediaServerURI}/live/${streamKey}.m3u8`
// create a stream pipe response using media server api with axios
const request = await axios.get(streamingHLSUri, {
responseType: "stream"
})
// create a buffer stream from the request
const bufferStream = request.data.pipe(new stream.PassThrough())
// set header for stream response
res.setHeader("Content-Type", "application/x-mpegURL")
// pipe the buffer stream to the response
bufferStream.on("data", (chunk) => {
res.write(chunk)
})
break;
}
default: {
return res.status(400).json({
error: "Stream mode not supported"
})
}
}
}
}
}
mediaServerEvents = {
prePublish: async (id, StreamPath, args) => {
// this event is fired before client is published
// here must be some validation (as key validation)
// get session
const session = this.IMediaServer.getSession(id)
// create a userspaced session for the client with containing session
this.Sessions.newSession(id, session)
const streamingKey = getStreamingKeyFromStreamPath(StreamPath)
const streamingUserspace = await StreamingKey.findOne({
key: streamingKey
})
if (!streamingUserspace) {
this.Sessions.removeSession(id)
return false
}
this.Sessions.publishStream({
id,
user_id: streamingUserspace.user_id,
username: streamingUserspace.username,
stream_key: streamingKey
})
},
donePublish: async (id, StreamPath, args) => {
// this event is fired when client has ended the connection
// stop the session
this.Sessions.removeSession(id)
const streamingKey = getStreamingKeyFromStreamPath(StreamPath)
this.Sessions.unpublishStream(streamingKey)
}
}
initialize = async () => {
await this.Db.connect()
await this.IHTTPServer.initialize()
await this.IMediaServer.run()
}
}
new StreamingServer()

View File

@ -0,0 +1,3 @@
export default function getStreamingKeyFromStreamPath(StreamPath) {
return StreamPath.split("/").pop()
}

View File

@ -0,0 +1 @@
export { default as getStreamingKeyFromStreamPath } from "./getStreamingKeyFromStreamPath"

View File

@ -0,0 +1,40 @@
import mongoose from "mongoose"
function parseConnectionString(obj) {
const { db_user, db_driver, db_name, db_pwd, db_hostname, db_port } = obj
return `${db_driver ?? "mongodb"}://${db_user ? `${db_user}` : ""}${db_pwd ? `:${db_pwd}` : ""}${db_user ? "@" : ""}${db_hostname ?? "localhost"}:${db_port ?? ""}/${db_name ?? ""}`
}
export default class DBManager {
constructor() {
this.env = process.env
}
connect = () => {
return new Promise((resolve, reject) => {
try {
console.log("🌐 Trying to connect to DB...")
const dbUri = parseConnectionString(this.env)
//console.log(dbUri)
mongoose.connect(dbUri, {
useNewUrlParser: true,
useUnifiedTopology: true
})
.then((res) => { return resolve(true) })
.catch((err) => { return reject(err) })
} catch (err) {
return reject(err)
}
}).then(done => {
console.log(`✅ Connected to DB`)
}).catch((error) => {
console.log(`❌ Failed to connect to DB, retrying...\n`)
console.log(error)
setTimeout(() => {
this.connect()
}, 1000)
})
}
}

View File

@ -0,0 +1,57 @@
import lodash from "lodash"
export default class SessionsManager {
constructor() {
this.sessions = {}
this.publicStreams = []
}
newSession = (id, session) => {
this.sessions[id] = session
}
getSession = (id) => {
return this.sessions[id]
}
removeSession = (id) => {
this.sessions[id].reject()
delete this.sessions[id]
}
publishStream = (payload) => {
if (typeof payload !== "object") {
throw new Error("Payload must be an object")
}
this.publicStreams.push(payload)
}
unpublishStream = (stream_key) => {
this.publicStreams = this.publicStreams.filter((stream) => stream.stream_key !== stream_key)
}
getPublicStreams = () => {
// return this.publicStreams but without stream_key property
return lodash.map(this.publicStreams, (stream) => {
return lodash.omit(stream, "stream_key")
})
}
getStreamsByUserId = (user_id) => {
const streams = lodash.filter(this.publicStreams, (stream) => stream.user_id === user_id)
return lodash.map(streams, (stream) => {
return lodash.omit(stream, "stream_key")
})
}
getStreamsByUsername = (username) => {
const streams = lodash.filter(this.publicStreams, (stream) => stream.username === username)
return lodash.map(streams, (stream) => {
return lodash.omit(stream, "stream_key")
})
}
}

View File

@ -0,0 +1,2 @@
export { default as DbManager } from "./DbManager"
export { default as SessionsManager } from "./SessionsManager"

View File

@ -0,0 +1,18 @@
import mongoose, { Schema } from "mongoose"
function getSchemas() {
const obj = Object()
const _schemas = require("../schemas")
Object.keys(_schemas).forEach((key) => {
obj[key] = Schema(_schemas[key])
})
return obj
}
const schemas = getSchemas()
// streaming
export const StreamingKey = mongoose.model("StreamingKey", schemas.streamingKey, "streamingKeys")

View File

@ -0,0 +1,14 @@
export default {
username: {
type: String,
required: true,
},
user_id: {
type: String,
required: true,
},
key: {
type: String,
required: true,
}
}

View File

@ -0,0 +1 @@
export { default as streamingKey } from "./streamingKey"