added internal-nms

This commit is contained in:
srgooglo 2022-05-13 15:13:02 +02:00
parent 5971e5bb2c
commit db726c2c61
20 changed files with 5395 additions and 3 deletions

View File

@ -12,8 +12,8 @@ import { Server } from "linebridge/dist/server"
import { SessionsManager, DbManager } from "./managers"
import { getStreamingKeyFromStreamPath, cpu } from "./lib"
import MediaServer from "./nms"
import FlvSession from "./nms/sessionsModels/flv_session"
import MediaServer from "./internal-nms"
import FlvSession from "./internal-nms/sessionsModels/flv_session"
import { StreamingKey } from "./models"

View File

@ -0,0 +1,13 @@
const EventEmitter = require("events")
let sessions = new Map()
let publishers = new Map()
let idlePlayers = new Set()
let nodeEvent = new EventEmitter()
let stat = {
inbytes: 0,
outbytes: 0,
accepted: 0
}
module.exports = { sessions, publishers, idlePlayers, nodeEvent, stat }

View File

@ -0,0 +1,236 @@
//
// Created by Mingliang Chen on 17/8/1.
// illuspas[a]gmail.com
// Copyright (c) 2018 Nodemedia. All rights reserved.
//
const lodash = require("lodash")
const os = require("os")
const { cpu } = require("../lib")
const Logger = require("./lib/logger")
const RtmpServer = require("./servers/rtmp_server")
const TransServer = require("./servers/trans_server")
const RelayServer = require("./servers/relay_server")
const FissionServer = require("./servers/fission_server")
const context = require("./ctx")
class MediaServer {
constructor(config) {
this.config = config
this.context = context
}
run() {
Logger.setLogType(this.config.logType)
if (this.config.rtmp) {
this.nrs = new RtmpServer(this.config)
this.nrs.run()
}
if (this.config.trans) {
if (this.config.cluster) {
Logger.log("TransServer does not work in cluster mode")
} else {
this.nts = new TransServer(this.config)
this.nts.run()
}
}
if (this.config.relay) {
if (this.config.cluster) {
Logger.log("RelayServer does not work in cluster mode")
} else {
this.nls = new RelayServer(this.config)
this.nls.run()
}
}
if (this.config.fission) {
if (this.config.cluster) {
Logger.log("FissionServer does not work in cluster mode")
} else {
this.nfs = new FissionServer(this.config)
this.nfs.run()
}
}
process.on("uncaughtException", function (err) {
Logger.error("uncaughtException", err)
})
process.on("SIGINT", function () {
process.exit()
})
}
on = (eventName, listener) => {
context.nodeEvent.on(eventName, listener)
}
stop = () => {
if (this.nrs) {
this.nrs.stop()
}
if (this.nhs) {
this.nhs.stop()
}
if (this.nls) {
this.nls.stop()
}
if (this.nfs) {
this.nfs.stop()
}
}
getSession = (id) => {
return context.sessions.get(id)
}
getSessions = () => {
let stats = {};
this.context.sessions.forEach(function (session, id) {
if (session.isStarting) {
let regRes = /\/(.*)\/(.*)/gi.exec(session.publishStreamPath || session.playStreamPath)
if (regRes === null) {
return
}
let [app, stream] = lodash.slice(regRes, 1)
if (!lodash.get(stats, [app, stream])) {
lodash.setWith(stats, [app, stream], {
publisher: null,
subscribers: []
}, Object)
}
switch (true) {
case session.isPublishing: {
lodash.setWith(stats, [app, stream, "publisher"], {
app: app,
stream: stream,
clientId: session.id,
connectCreated: session.connectTime,
bytes: session.socket.bytesRead,
ip: session.socket.remoteAddress,
audio: session.audioCodec > 0 ? {
codec: session.audioCodecName,
profile: session.audioProfileName,
samplerate: session.audioSamplerate,
channels: session.audioChannels
} : null,
video: session.videoCodec > 0 ? {
codec: session.videoCodecName,
width: session.videoWidth,
height: session.videoHeight,
profile: session.videoProfileName,
level: session.videoLevel,
fps: session.videoFps
} : null,
}, Object)
break;
}
case !!session.playStreamPath: {
switch (session.constructor.name) {
case "NodeRtmpSession": {
stats[app][stream]["subscribers"].push({
app: app,
stream: stream,
clientId: session.id,
connectCreated: session.connectTime,
bytes: session.socket.bytesWritten,
ip: session.socket.remoteAddress,
protocol: "rtmp"
})
break
}
case "NodeFlvSession": {
stats[app][stream]["subscribers"].push({
app: app,
stream: stream,
clientId: session.id,
connectCreated: session.connectTime,
bytes: session.req.connection.bytesWritten,
ip: session.req.connection.remoteAddress,
protocol: session.TAG === "websocket-flv" ? "ws" : "http"
})
break
}
}
break
}
}
}
})
return stats
}
getSessionsInfo = () => {
let info = {
inbytes: 0,
outbytes: 0,
rtmp: 0,
http: 0,
ws: 0,
}
for (let session of this.context.sessions.values()) {
if (session.TAG === "relay") {
continue
}
let socket = session.TAG === "rtmp" ? session.socket : session.req.socket
info.inbytes += socket.bytesRead
info.outbytes += socket.bytesWritten
info.rtmp += session.TAG === "rtmp" ? 1 : 0
info.http += session.TAG === "http-flv" ? 1 : 0
info.ws += session.TAG === "websocket-flv" ? 1 : 0
}
return info
}
getServerStatus = async () => {
const cpuPercentageUsage = await cpu.percentageUsage()
const sessionsInfo = this.getSessionsInfo()
return {
os: {
arch: os.arch(),
platform: os.platform(),
release: os.release(),
},
cpu: {
num: os.cpus().length,
load: cpuPercentageUsage,
model: os.cpus()[0].model,
speed: os.cpus()[0].speed,
},
net: {
inbytes: this.context.stat.inbytes + sessionsInfo.inbytes,
outbytes: this.context.stat.outbytes + sessionsInfo.outbytes,
},
mem: {
totle: os.totalmem(),
free: os.freemem()
},
nodejs: {
uptime: Math.floor(process.uptime()),
version: process.version,
mem: process.memoryUsage()
},
}
}
}
module.exports = MediaServer

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,515 @@
//
// Created by Mingliang Chen on 17/12/21.
// illuspas[a]gmail.com
// Copyright (c) 2018 Nodemedia. All rights reserved.
//
const Bitop = require("./bitop")
const AAC_SAMPLE_RATE = [
96000, 88200, 64000, 48000,
44100, 32000, 24000, 22050,
16000, 12000, 11025, 8000,
7350, 0, 0, 0
]
const AAC_CHANNELS = [
0, 1, 2, 3, 4, 5, 6, 8
]
const AUDIO_CODEC_NAME = [
"",
"ADPCM",
"MP3",
"LinearLE",
"Nellymoser16",
"Nellymoser8",
"Nellymoser",
"G711A",
"G711U",
"",
"AAC",
"Speex",
"",
"OPUS",
"MP3-8K",
"DeviceSpecific",
"Uncompressed"
]
const AUDIO_SOUND_RATE = [
5512, 11025, 22050, 44100
]
const VIDEO_CODEC_NAME = [
"",
"Jpeg",
"Sorenson-H263",
"ScreenVideo",
"On2-VP6",
"On2-VP6-Alpha",
"ScreenVideo2",
"H264",
"",
"",
"",
"",
"H265"
]
function getObjectType(bitop) {
let audioObjectType = bitop.read(5)
if (audioObjectType === 31) {
audioObjectType = bitop.read(6) + 32
}
return audioObjectType
}
function getSampleRate(bitop, info) {
info.sampling_index = bitop.read(4)
return info.sampling_index == 0x0f ? bitop.read(24) : AAC_SAMPLE_RATE[info.sampling_index]
}
function readAACSpecificConfig(aacSequenceHeader) {
let info = {}
let bitop = new Bitop(aacSequenceHeader)
bitop.read(16)
info.object_type = getObjectType(bitop)
info.sample_rate = getSampleRate(bitop, info)
info.chan_config = bitop.read(4)
if (info.chan_config < AAC_CHANNELS.length) {
info.channels = AAC_CHANNELS[info.chan_config]
}
info.sbr = -1
info.ps = -1
if (info.object_type == 5 || info.object_type == 29) {
if (info.object_type == 29) {
info.ps = 1
}
info.ext_object_type = 5
info.sbr = 1
info.sample_rate = getSampleRate(bitop, info)
info.object_type = getObjectType(bitop)
}
return info
}
function getAACProfileName(info) {
switch (info.object_type) {
case 1:
return "Main"
case 2:
if (info.ps > 0) {
return "HEv2"
}
if (info.sbr > 0) {
return "HE"
}
return "LC"
case 3:
return "SSR"
case 4:
return "LTP"
case 5:
return "SBR"
default:
return ""
}
}
function readH264SpecificConfig(avcSequenceHeader) {
let info = {}
let profile_idc, width, height, crop_left, crop_right,
crop_top, crop_bottom, frame_mbs_only, n, cf_idc,
num_ref_frames
let bitop = new Bitop(avcSequenceHeader)
bitop.read(48)
info.width = 0
info.height = 0
do {
info.profile = bitop.read(8)
info.compat = bitop.read(8)
info.level = bitop.read(8)
info.nalu = (bitop.read(8) & 0x03) + 1
info.nb_sps = bitop.read(8) & 0x1F
if (info.nb_sps == 0) {
break
}
/* nal size */
bitop.read(16)
/* nal type */
if (bitop.read(8) != 0x67) {
break
}
/* SPS */
profile_idc = bitop.read(8)
/* flags */
bitop.read(8)
/* level idc */
bitop.read(8)
/* SPS id */
bitop.read_golomb()
if (profile_idc == 100 || profile_idc == 110 ||
profile_idc == 122 || profile_idc == 244 || profile_idc == 44 ||
profile_idc == 83 || profile_idc == 86 || profile_idc == 118) {
/* chroma format idc */
cf_idc = bitop.read_golomb()
if (cf_idc == 3) {
/* separate color plane */
bitop.read(1)
}
/* bit depth luma - 8 */
bitop.read_golomb()
/* bit depth chroma - 8 */
bitop.read_golomb()
/* qpprime y zero transform bypass */
bitop.read(1)
/* seq scaling matrix present */
if (bitop.read(1)) {
for (n = 0; n < (cf_idc != 3 ? 8 : 12); n++) {
/* seq scaling list present */
if (bitop.read(1)) {
/* TODO: scaling_list()
if (n < 6) {
} else {
}
*/
}
}
}
}
/* log2 max frame num */
bitop.read_golomb()
/* pic order cnt type */
switch (bitop.read_golomb()) {
case 0:
/* max pic order cnt */
bitop.read_golomb()
break
case 1:
/* delta pic order alwys zero */
bitop.read(1)
/* offset for non-ref pic */
bitop.read_golomb()
/* offset for top to bottom field */
bitop.read_golomb()
/* num ref frames in pic order */
num_ref_frames = bitop.read_golomb()
for (n = 0; n < num_ref_frames; n++) {
/* offset for ref frame */
bitop.read_golomb()
}
}
/* num ref frames */
info.avc_ref_frames = bitop.read_golomb()
/* gaps in frame num allowed */
bitop.read(1)
/* pic width in mbs - 1 */
width = bitop.read_golomb()
/* pic height in map units - 1 */
height = bitop.read_golomb()
/* frame mbs only flag */
frame_mbs_only = bitop.read(1)
if (!frame_mbs_only) {
/* mbs adaprive frame field */
bitop.read(1)
}
/* direct 8x8 inference flag */
bitop.read(1)
/* frame cropping */
if (bitop.read(1)) {
crop_left = bitop.read_golomb()
crop_right = bitop.read_golomb()
crop_top = bitop.read_golomb()
crop_bottom = bitop.read_golomb()
} else {
crop_left = 0
crop_right = 0
crop_top = 0
crop_bottom = 0
}
info.level = info.level / 10.0
info.width = (width + 1) * 16 - (crop_left + crop_right) * 2
info.height = (2 - frame_mbs_only) * (height + 1) * 16 - (crop_top + crop_bottom) * 2
} while (0)
return info
}
function HEVCParsePtl(bitop, hevc, max_sub_layers_minus1) {
let general_ptl = {}
general_ptl.profile_space = bitop.read(2)
general_ptl.tier_flag = bitop.read(1)
general_ptl.profile_idc = bitop.read(5)
general_ptl.profile_compatibility_flags = bitop.read(32)
general_ptl.general_progressive_source_flag = bitop.read(1)
general_ptl.general_interlaced_source_flag = bitop.read(1)
general_ptl.general_non_packed_constraint_flag = bitop.read(1)
general_ptl.general_frame_only_constraint_flag = bitop.read(1)
bitop.read(32)
bitop.read(12)
general_ptl.level_idc = bitop.read(8)
general_ptl.sub_layer_profile_present_flag = []
general_ptl.sub_layer_level_present_flag = []
for (let i = 0; i < max_sub_layers_minus1; i++) {
general_ptl.sub_layer_profile_present_flag[i] = bitop.read(1)
general_ptl.sub_layer_level_present_flag[i] = bitop.read(1)
}
if (max_sub_layers_minus1 > 0) {
for (let i = max_sub_layers_minus1; i < 8; i++) {
bitop.read(2)
}
}
general_ptl.sub_layer_profile_space = []
general_ptl.sub_layer_tier_flag = []
general_ptl.sub_layer_profile_idc = []
general_ptl.sub_layer_profile_compatibility_flag = []
general_ptl.sub_layer_progressive_source_flag = []
general_ptl.sub_layer_interlaced_source_flag = []
general_ptl.sub_layer_non_packed_constraint_flag = []
general_ptl.sub_layer_frame_only_constraint_flag = []
general_ptl.sub_layer_level_idc = []
for (let i = 0; i < max_sub_layers_minus1; i++) {
if (general_ptl.sub_layer_profile_present_flag[i]) {
general_ptl.sub_layer_profile_space[i] = bitop.read(2)
general_ptl.sub_layer_tier_flag[i] = bitop.read(1)
general_ptl.sub_layer_profile_idc[i] = bitop.read(5)
general_ptl.sub_layer_profile_compatibility_flag[i] = bitop.read(32)
general_ptl.sub_layer_progressive_source_flag[i] = bitop.read(1)
general_ptl.sub_layer_interlaced_source_flag[i] = bitop.read(1)
general_ptl.sub_layer_non_packed_constraint_flag[i] = bitop.read(1)
general_ptl.sub_layer_frame_only_constraint_flag[i] = bitop.read(1)
bitop.read(32)
bitop.read(12)
}
if (general_ptl.sub_layer_level_present_flag[i]) {
general_ptl.sub_layer_level_idc[i] = bitop.read(8)
}
else {
general_ptl.sub_layer_level_idc[i] = 1
}
}
return general_ptl
}
function HEVCParseSPS(SPS, hevc) {
let psps = {}
let NumBytesInNALunit = SPS.length
let NumBytesInRBSP = 0
let rbsp_array = []
let bitop = new Bitop(SPS)
bitop.read(1)//forbidden_zero_bit
bitop.read(6)//nal_unit_type
bitop.read(6)//nuh_reserved_zero_6bits
bitop.read(3)//nuh_temporal_id_plus1
for (let i = 2; i < NumBytesInNALunit; i++) {
if (i + 2 < NumBytesInNALunit && bitop.look(24) == 0x000003) {
rbsp_array.push(bitop.read(8))
rbsp_array.push(bitop.read(8))
i += 2
let emulation_prevention_three_byte = bitop.read(8) /* equal to 0x03 */
} else {
rbsp_array.push(bitop.read(8))
}
}
let rbsp = Buffer.from(rbsp_array)
let rbspBitop = new Bitop(rbsp)
psps.sps_video_parameter_set_id = rbspBitop.read(4)
psps.sps_max_sub_layers_minus1 = rbspBitop.read(3)
psps.sps_temporal_id_nesting_flag = rbspBitop.read(1)
psps.profile_tier_level = HEVCParsePtl(rbspBitop, hevc, psps.sps_max_sub_layers_minus1)
psps.sps_seq_parameter_set_id = rbspBitop.read_golomb()
psps.chroma_format_idc = rbspBitop.read_golomb()
if (psps.chroma_format_idc == 3) {
psps.separate_colour_plane_flag = rbspBitop.read(1)
} else {
psps.separate_colour_plane_flag = 0
}
psps.pic_width_in_luma_samples = rbspBitop.read_golomb()
psps.pic_height_in_luma_samples = rbspBitop.read_golomb()
psps.conformance_window_flag = rbspBitop.read(1)
if (psps.conformance_window_flag) {
let vert_mult = 1 + (psps.chroma_format_idc < 2)
let horiz_mult = 1 + (psps.chroma_format_idc < 3)
psps.conf_win_left_offset = rbspBitop.read_golomb() * horiz_mult
psps.conf_win_right_offset = rbspBitop.read_golomb() * horiz_mult
psps.conf_win_top_offset = rbspBitop.read_golomb() * vert_mult
psps.conf_win_bottom_offset = rbspBitop.read_golomb() * vert_mult
}
// Logger.debug(psps)
return psps
}
function readHEVCSpecificConfig(hevcSequenceHeader) {
let info = {}
info.width = 0
info.height = 0
info.profile = 0
info.level = 0
// let bitop = new Bitop(hevcSequenceHeader)
// bitop.read(48)
hevcSequenceHeader = hevcSequenceHeader.slice(5)
do {
let hevc = {}
if (hevcSequenceHeader.length < 23) {
break
}
hevc.configurationVersion = hevcSequenceHeader[0]
if (hevc.configurationVersion != 1) {
break
}
hevc.general_profile_space = (hevcSequenceHeader[1] >> 6) & 0x03
hevc.general_tier_flag = (hevcSequenceHeader[1] >> 5) & 0x01
hevc.general_profile_idc = hevcSequenceHeader[1] & 0x1F
hevc.general_profile_compatibility_flags = (hevcSequenceHeader[2] << 24) | (hevcSequenceHeader[3] << 16) | (hevcSequenceHeader[4] << 8) | hevcSequenceHeader[5]
hevc.general_constraint_indicator_flags = ((hevcSequenceHeader[6] << 24) | (hevcSequenceHeader[7] << 16) | (hevcSequenceHeader[8] << 8) | hevcSequenceHeader[9])
hevc.general_constraint_indicator_flags = (hevc.general_constraint_indicator_flags << 16) | (hevcSequenceHeader[10] << 8) | hevcSequenceHeader[11]
hevc.general_level_idc = hevcSequenceHeader[12]
hevc.min_spatial_segmentation_idc = ((hevcSequenceHeader[13] & 0x0F) << 8) | hevcSequenceHeader[14]
hevc.parallelismType = hevcSequenceHeader[15] & 0x03
hevc.chromaFormat = hevcSequenceHeader[16] & 0x03
hevc.bitDepthLumaMinus8 = hevcSequenceHeader[17] & 0x07
hevc.bitDepthChromaMinus8 = hevcSequenceHeader[18] & 0x07
hevc.avgFrameRate = (hevcSequenceHeader[19] << 8) | hevcSequenceHeader[20]
hevc.constantFrameRate = (hevcSequenceHeader[21] >> 6) & 0x03
hevc.numTemporalLayers = (hevcSequenceHeader[21] >> 3) & 0x07
hevc.temporalIdNested = (hevcSequenceHeader[21] >> 2) & 0x01
hevc.lengthSizeMinusOne = hevcSequenceHeader[21] & 0x03
let numOfArrays = hevcSequenceHeader[22]
let p = hevcSequenceHeader.slice(23)
for (let i = 0; i < numOfArrays; i++) {
if (p.length < 3) {
brak
}
let nalutype = p[0]
let n = (p[1]) << 8 | p[2]
// Logger.debug(nalutype, n)
p = p.slice(3)
for (let j = 0; j < n; j++) {
if (p.length < 2) {
break
}
let k = (p[0] << 8) | p[1]
// Logger.debug("k", k)
if (p.length < 2 + k) {
break
}
p = p.slice(2)
if (nalutype == 33) {
//SPS
let sps = Buffer.alloc(k)
p.copy(sps, 0, 0, k)
// Logger.debug(sps, sps.length)
hevc.psps = HEVCParseSPS(sps, hevc)
info.profile = hevc.general_profile_idc
info.level = hevc.general_level_idc / 30.0
info.width = hevc.psps.pic_width_in_luma_samples - (hevc.psps.conf_win_left_offset + hevc.psps.conf_win_right_offset)
info.height = hevc.psps.pic_height_in_luma_samples - (hevc.psps.conf_win_top_offset + hevc.psps.conf_win_bottom_offset)
}
p = p.slice(k)
}
}
} while (0)
return info
}
function readAVCSpecificConfig(avcSequenceHeader) {
let codec_id = avcSequenceHeader[0] & 0x0f
if (codec_id == 7) {
return readH264SpecificConfig(avcSequenceHeader)
} else if (codec_id == 12) {
return readHEVCSpecificConfig(avcSequenceHeader)
}
}
function getAVCProfileName(info) {
switch (info.profile) {
case 1:
return "Main"
case 2:
return "Main 10"
case 3:
return "Main Still Picture"
case 66:
return "Baseline"
case 77:
return "Main"
case 100:
return "High"
default:
return ""
}
}
module.exports = {
AUDIO_SOUND_RATE,
AUDIO_CODEC_NAME,
VIDEO_CODEC_NAME,
readAACSpecificConfig,
getAACProfileName,
readAVCSpecificConfig,
getAVCProfileName,
}

View File

@ -0,0 +1,53 @@
class Bitop {
constructor(buffer) {
this.buffer = buffer;
this.buflen = buffer.length;
this.bufpos = 0;
this.bufoff = 0;
this.iserro = false;
}
read(n) {
let v = 0;
let d = 0;
while (n) {
if (n < 0 || this.bufpos >= this.buflen) {
this.iserro = true;
return 0;
}
this.iserro = false;
d = this.bufoff + n > 8 ? 8 - this.bufoff : n;
v <<= d;
v += (this.buffer[this.bufpos] >> (8 - this.bufoff - d)) & (0xff >> (8 - d));
this.bufoff += d;
n -= d;
if (this.bufoff == 8) {
this.bufpos++;
this.bufoff = 0;
}
}
return v;
}
look(n) {
let p = this.bufpos;
let o = this.bufoff;
let v = this.read(n);
this.bufpos = p;
this.bufoff = o;
return v;
}
read_golomb() {
let n;
for (n = 0; this.read(1) == 0 && !this.iserro; n++);
return (1 << n) + this.read(n) - 1;
}
}
module.exports = Bitop;

View File

@ -0,0 +1,53 @@
const chalk = require("chalk")
const LOG_TYPES = {
NONE: 0,
ERROR: 1,
NORMAL: 2,
DEBUG: 3,
FFDEBUG: 4
}
let logType = LOG_TYPES.NORMAL
const setLogType = (type) => {
if (typeof type !== "number") return
logType = type
}
const logTime = () => {
let nowDate = new Date()
return nowDate.toLocaleDateString() + " " + nowDate.toLocaleTimeString([], { hour12: false })
}
const log = (...args) => {
if (logType < LOG_TYPES.NORMAL) return
console.log(logTime(), process.pid, chalk.bold.green("[INFO]"), ...args)
}
const error = (...args) => {
if (logType < LOG_TYPES.ERROR) return
console.log(logTime(), process.pid, chalk.bold.red("[ERROR]"), ...args)
}
const debug = (...args) => {
if (logType < LOG_TYPES.DEBUG) return
console.log(logTime(), process.pid, chalk.bold.blue("[DEBUG]"), ...args)
}
const ffdebug = (...args) => {
if (logType < LOG_TYPES.FFDEBUG) return
console.log(logTime(), process.pid, chalk.bold.blue("[FFDEBUG]"), ...args)
}
module.exports = {
LOG_TYPES,
setLogType,
log, error, debug, ffdebug
}

View File

@ -0,0 +1,106 @@
//
// Created by Mingliang Chen on 17/8/23.
// illuspas[a]gmail.com
// Copyright (c) 2018 Nodemedia. All rights reserved.
//
const Crypto = require("crypto")
const { spawn } = require("child_process")
const context = require("../ctx")
function generateNewSessionID() {
let sessionID = ""
const possible = "ABCDEFGHIJKLMNOPQRSTUVWKYZ0123456789"
const numPossible = possible.length
do {
for (let i = 0; i < 8; i++) {
sessionID += possible.charAt((Math.random() * numPossible) | 0)
}
} while (context.sessions.has(sessionID))
return sessionID
}
function genRandomName() {
let name = ""
const possible = "abcdefghijklmnopqrstuvwxyz0123456789"
const numPossible = possible.length
for (let i = 0; i < 4; i++) {
name += possible.charAt((Math.random() * numPossible) | 0)
}
return name
}
function verifyAuth(signStr, streamId, secretKey) {
if (signStr === undefined) {
return false
}
let now = Date.now() / 1000 | 0
let exp = parseInt(signStr.split("-")[0])
let shv = signStr.split("-")[1]
let str = streamId + "-" + exp + "-" + secretKey
if (exp < now) {
return false
}
let md5 = Crypto.createHash("md5")
let ohv = md5.update(str).digest("hex")
return shv === ohv
}
function getFFmpegVersion(ffpath) {
return new Promise((resolve, reject) => {
let ffmpeg_exec = spawn(ffpath, ["-version"])
let version = ""
ffmpeg_exec.on("error", (e) => {
reject(e)
})
ffmpeg_exec.stdout.on("data", (data) => {
try {
version = data.toString().split(/(?:\r\n|\r|\n)/g)[0].split("\ ")[2]
} catch (e) {
}
})
ffmpeg_exec.on("close", (code) => {
resolve(version)
})
})
}
function getFFmpegUrl() {
let url = ""
switch (process.platform) {
case "darwin":
url = "https://ffmpeg.zeranoe.com/builds/macos64/static/ffmpeg-latest-macos64-static.zip"
break
case "win32":
url = "https://ffmpeg.zeranoe.com/builds/win64/static/ffmpeg-latest-win64-static.zip | https://ffmpeg.zeranoe.com/builds/win32/static/ffmpeg-latest-win32-static.zip"
break
case "linux":
url = "https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz | https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-i686-static.tar.xz"
break
default:
url = "http://ffmpeg.org/download.html"
break
}
return url
}
module.exports = {
generateNewSessionID,
verifyAuth,
genRandomName,
getFFmpegVersion,
getFFmpegUrl
}

View File

@ -0,0 +1,793 @@
//
// Created by Mingliang Chen on 18/6/21.
// illuspas[a]gmail.com
// Copyright (c) 2018 Nodemedia. All rights reserved.
//
const EventEmitter = require("events")
const Crypto = require("crypto")
const Url = require("url")
const Net = require("net")
const AMF = require("./lib/amf_rules")
const Logger = require("./lib/logger")
const FLASHVER = "LNX 9,0,124,2"
const RTMP_OUT_CHUNK_SIZE = 60000
const RTMP_PORT = 1935
const RTMP_HANDSHAKE_SIZE = 1536
const RTMP_HANDSHAKE_UNINIT = 0
const RTMP_HANDSHAKE_0 = 1
const RTMP_HANDSHAKE_1 = 2
const RTMP_HANDSHAKE_2 = 3
const RTMP_PARSE_INIT = 0
const RTMP_PARSE_BASIC_HEADER = 1
const RTMP_PARSE_MESSAGE_HEADER = 2
const RTMP_PARSE_EXTENDED_TIMESTAMP = 3
const RTMP_PARSE_PAYLOAD = 4
const RTMP_CHUNK_HEADER_MAX = 18
const RTMP_CHUNK_TYPE_0 = 0 // 11-bytes: timestamp(3) + length(3) + stream type(1) + stream id(4)
const RTMP_CHUNK_TYPE_1 = 1 // 7-bytes: delta(3) + length(3) + stream type(1)
const RTMP_CHUNK_TYPE_2 = 2 // 3-bytes: delta(3)
const RTMP_CHUNK_TYPE_3 = 3 // 0-byte
const RTMP_CHANNEL_PROTOCOL = 2
const RTMP_CHANNEL_INVOKE = 3
const RTMP_CHANNEL_AUDIO = 4
const RTMP_CHANNEL_VIDEO = 5
const RTMP_CHANNEL_DATA = 6
const rtmpHeaderSize = [11, 7, 3, 0]
/* Protocol Control Messages */
const RTMP_TYPE_SET_CHUNK_SIZE = 1
const RTMP_TYPE_ABORT = 2
const RTMP_TYPE_ACKNOWLEDGEMENT = 3 // bytes read report
const RTMP_TYPE_WINDOW_ACKNOWLEDGEMENT_SIZE = 5 // server bandwidth
const RTMP_TYPE_SET_PEER_BANDWIDTH = 6 // client bandwidth
/* User Control Messages Event (4) */
const RTMP_TYPE_EVENT = 4
const RTMP_TYPE_AUDIO = 8
const RTMP_TYPE_VIDEO = 9
/* Data Message */
const RTMP_TYPE_FLEX_STREAM = 15 // AMF3
const RTMP_TYPE_DATA = 18 // AMF0
/* Shared Object Message */
const RTMP_TYPE_FLEX_OBJECT = 16 // AMF3
const RTMP_TYPE_SHARED_OBJECT = 19 // AMF0
/* Command Message */
const RTMP_TYPE_FLEX_MESSAGE = 17 // AMF3
const RTMP_TYPE_INVOKE = 20 // AMF0
/* Aggregate Message */
const RTMP_TYPE_METADATA = 22
const RTMP_CHUNK_SIZE = 128
const RTMP_PING_TIME = 60000
const RTMP_PING_TIMEOUT = 30000
const STREAM_BEGIN = 0x00
const STREAM_EOF = 0x01
const STREAM_DRY = 0x02
const STREAM_EMPTY = 0x1f
const STREAM_READY = 0x20
const RTMP_TRANSACTION_CONNECT = 1
const RTMP_TRANSACTION_CREATE_STREAM = 2
const RTMP_TRANSACTION_GET_STREAM_LENGTH = 3
const RtmpPacket = {
create: (fmt = 0, cid = 0) => {
return {
header: {
fmt: fmt,
cid: cid,
timestamp: 0,
length: 0,
type: 0,
stream_id: 0
},
clock: 0,
delta: 0,
payload: null,
capacity: 0,
bytes: 0
}
}
}
class NodeRtmpClient {
constructor(rtmpUrl) {
this.url = rtmpUrl
this.info = this.rtmpUrlParser(rtmpUrl)
this.isPublish = false
this.launcher = new EventEmitter()
this.handshakePayload = Buffer.alloc(RTMP_HANDSHAKE_SIZE)
this.handshakeState = RTMP_HANDSHAKE_UNINIT
this.handshakeBytes = 0
this.parserBuffer = Buffer.alloc(RTMP_CHUNK_HEADER_MAX)
this.parserState = RTMP_PARSE_INIT
this.parserBytes = 0
this.parserBasicBytes = 0
this.parserPacket = null
this.inPackets = new Map()
this.inChunkSize = RTMP_CHUNK_SIZE
this.outChunkSize = RTMP_CHUNK_SIZE
this.streamId = 0
this.isSocketOpen = false
}
onSocketData(data) {
let bytes = data.length
let p = 0
let n = 0
while (bytes > 0) {
switch (this.handshakeState) {
case RTMP_HANDSHAKE_UNINIT:
// read s0
// Logger.debug("[rtmp client] read s0")
this.handshakeState = RTMP_HANDSHAKE_0
this.handshakeBytes = 0
bytes -= 1
p += 1
break
case RTMP_HANDSHAKE_0:
// read s1
n = RTMP_HANDSHAKE_SIZE - this.handshakeBytes
n = n <= bytes ? n : bytes
data.copy(this.handshakePayload, this.handshakeBytes, p, p + n)
this.handshakeBytes += n
bytes -= n
p += n
if (this.handshakeBytes === RTMP_HANDSHAKE_SIZE) {
// Logger.debug("[rtmp client] read s1")
this.handshakeState = RTMP_HANDSHAKE_1
this.handshakeBytes = 0
this.socket.write(this.handshakePayload)// write c2
// Logger.debug("[rtmp client] write c2")
}
break
case RTMP_HANDSHAKE_1:
//read s2
n = RTMP_HANDSHAKE_SIZE - this.handshakeBytes
n = n <= bytes ? n : bytes
data.copy(this.handshakePayload, this.handshakeBytes, p, n)
this.handshakeBytes += n
bytes -= n
p += n
if (this.handshakeBytes === RTMP_HANDSHAKE_SIZE) {
// Logger.debug("[rtmp client] read s2")
this.handshakeState = RTMP_HANDSHAKE_2
this.handshakeBytes = 0
this.handshakePayload = null
this.rtmpSendConnect()
}
break
case RTMP_HANDSHAKE_2:
return this.rtmpChunkRead(data, p, bytes)
}
}
}
onSocketError(e) {
Logger.error("rtmp_client", "onSocketError", e)
this.isSocketOpen = false
this.stop()
}
onSocketClose() {
// Logger.debug("rtmp_client", "onSocketClose")
this.isSocketOpen = false
this.stop()
}
onSocketTimeout() {
// Logger.debug("rtmp_client", "onSocketTimeout")
this.isSocketOpen = false
this.stop()
}
on(event, callback) {
this.launcher.on(event, callback)
}
startPull() {
this._start()
}
startPush() {
this.isPublish = true
this._start()
}
_start() {
this.socket = Net.createConnection(this.info.port, this.info.hostname, () => {
//rtmp handshark c0c1
let c0c1 = Crypto.randomBytes(1537)
c0c1.writeUInt8(3)
c0c1.writeUInt32BE(Date.now() / 1000, 1)
c0c1.writeUInt32BE(0, 5)
this.socket.write(c0c1)
// Logger.debug("[rtmp client] write c0c1")
})
this.socket.on("data", this.onSocketData.bind(this))
this.socket.on("error", this.onSocketError.bind(this))
this.socket.on("close", this.onSocketClose.bind(this))
this.socket.on("timeout", this.onSocketTimeout.bind(this))
this.socket.setTimeout(60000)
}
stop() {
if (this.streamId > 0) {
if (!this.socket.destroyed) {
if (this.isPublish) {
this.rtmpSendFCUnpublish()
}
this.rtmpSendDeleteStream()
this.socket.destroy()
}
this.streamId = 0
this.launcher.emit("close")
}
}
pushAudio(audioData, timestamp) {
if (this.streamId == 0) return
let packet = RtmpPacket.create()
packet.header.fmt = RTMP_CHUNK_TYPE_0
packet.header.cid = RTMP_CHANNEL_AUDIO
packet.header.type = RTMP_TYPE_AUDIO
packet.payload = audioData
packet.header.length = packet.payload.length
packet.header.timestamp = timestamp
let rtmpChunks = this.rtmpChunksCreate(packet)
this.socket.write(rtmpChunks)
}
pushVideo(videoData, timestamp) {
if (this.streamId == 0) return
let packet = RtmpPacket.create()
packet.header.fmt = RTMP_CHUNK_TYPE_0
packet.header.cid = RTMP_CHANNEL_VIDEO
packet.header.type = RTMP_TYPE_VIDEO
packet.payload = videoData
packet.header.length = packet.payload.length
packet.header.timestamp = timestamp
let rtmpChunks = this.rtmpChunksCreate(packet)
this.socket.write(rtmpChunks)
}
pushScript(scriptData, timestamp) {
if (this.streamId == 0) return
let packet = RtmpPacket.create()
packet.header.fmt = RTMP_CHUNK_TYPE_0
packet.header.cid = RTMP_CHANNEL_DATA
packet.header.type = RTMP_TYPE_DATA
packet.payload = scriptData
packet.header.length = packet.payload.length
packet.header.timestamp = timestamp
let rtmpChunks = this.rtmpChunksCreate(packet)
this.socket.write(rtmpChunks)
}
rtmpUrlParser(url) {
let urlInfo = Url.parse(url, true)
urlInfo.app = urlInfo.path.split("/")[1]
urlInfo.port = urlInfo.port ? urlInfo.port : RTMP_PORT
urlInfo.tcurl = urlInfo.href.match(/rtmp:\/\/([^\/]+)\/([^\/]+)/)[0]
urlInfo.stream = urlInfo.path.slice(urlInfo.app.length + 2)
return urlInfo
}
rtmpChunkBasicHeaderCreate(fmt, cid) {
let out
if (cid >= 64 + 255) {
out = Buffer.alloc(3)
out[0] = (fmt << 6) | 1
out[1] = (cid - 64) & 0xFF
out[2] = ((cid - 64) >> 8) & 0xFF
} else if (cid >= 64) {
out = Buffer.alloc(2)
out[0] = (fmt << 6) | 0
out[1] = (cid - 64) & 0xFF
} else {
out = Buffer.alloc(1)
out[0] = (fmt << 6) | cid
}
return out
}
rtmpChunkMessageHeaderCreate(header) {
let out = Buffer.alloc(rtmpHeaderSize[header.fmt % 4])
if (header.fmt <= RTMP_CHUNK_TYPE_2) {
out.writeUIntBE(header.timestamp >= 0xffffff ? 0xffffff : header.timestamp, 0, 3)
}
if (header.fmt <= RTMP_CHUNK_TYPE_1) {
out.writeUIntBE(header.length, 3, 3)
out.writeUInt8(header.type, 6)
}
if (header.fmt === RTMP_CHUNK_TYPE_0) {
out.writeUInt32LE(header.stream_id, 7)
}
return out
}
rtmpChunksCreate(packet) {
let header = packet.header
let payload = packet.payload
let payloadSize = header.length
let chunkSize = this.outChunkSize
let chunksOffset = 0
let payloadOffset = 0
let chunkBasicHeader = this.rtmpChunkBasicHeaderCreate(header.fmt, header.cid)
let chunkBasicHeader3 = this.rtmpChunkBasicHeaderCreate(RTMP_CHUNK_TYPE_3, header.cid)
let chunkMessageHeader = this.rtmpChunkMessageHeaderCreate(header)
let useExtendedTimestamp = header.timestamp >= 0xffffff
let headerSize = chunkBasicHeader.length + chunkMessageHeader.length + (useExtendedTimestamp ? 4 : 0)
let n = headerSize + payloadSize + Math.floor(payloadSize / chunkSize)
if (useExtendedTimestamp) {
n += Math.floor(payloadSize / chunkSize) * 4
}
if (!(payloadSize % chunkSize)) {
n -= 1
if (useExtendedTimestamp) { //TODO CHECK
n -= 4
}
}
let chunks = Buffer.alloc(n)
chunkBasicHeader.copy(chunks, chunksOffset)
chunksOffset += chunkBasicHeader.length
chunkMessageHeader.copy(chunks, chunksOffset)
chunksOffset += chunkMessageHeader.length
if (useExtendedTimestamp) {
chunks.writeUInt32BE(header.timestamp, chunksOffset)
chunksOffset += 4
}
while (payloadSize > 0) {
if (payloadSize > chunkSize) {
payload.copy(chunks, chunksOffset, payloadOffset, payloadOffset + chunkSize)
payloadSize -= chunkSize
chunksOffset += chunkSize
payloadOffset += chunkSize
chunkBasicHeader3.copy(chunks, chunksOffset)
chunksOffset += chunkBasicHeader3.length
if (useExtendedTimestamp) {
chunks.writeUInt32BE(header.timestamp, chunksOffset)
chunksOffset += 4
}
} else {
payload.copy(chunks, chunksOffset, payloadOffset, payloadOffset + payloadSize)
payloadSize -= payloadSize
chunksOffset += payloadSize
payloadOffset += payloadSize
}
}
return chunks
}
rtmpChunkRead(data, p, bytes) {
let size = 0
let offset = 0
let extended_timestamp = 0
while (offset < bytes) {
switch (this.parserState) {
case RTMP_PARSE_INIT:
this.parserBytes = 1
this.parserBuffer[0] = data[p + offset++]
if (0 === (this.parserBuffer[0] & 0x3F)) {
this.parserBasicBytes = 2
} else if (1 === (this.parserBuffer[0] & 0x3F)) {
this.parserBasicBytes = 3
} else {
this.parserBasicBytes = 1
}
this.parserState = RTMP_PARSE_BASIC_HEADER
break
case RTMP_PARSE_BASIC_HEADER:
while (this.parserBytes < this.parserBasicBytes && offset < bytes) {
this.parserBuffer[this.parserBytes++] = data[p + offset++]
}
if (this.parserBytes >= this.parserBasicBytes) {
this.parserState = RTMP_PARSE_MESSAGE_HEADER
}
break
case RTMP_PARSE_MESSAGE_HEADER:
size = rtmpHeaderSize[this.parserBuffer[0] >> 6] + this.parserBasicBytes
while (this.parserBytes < size && offset < bytes) {
this.parserBuffer[this.parserBytes++] = data[p + offset++]
}
if (this.parserBytes >= size) {
this.rtmpPacketParse()
this.parserState = RTMP_PARSE_EXTENDED_TIMESTAMP
}
break
case RTMP_PARSE_EXTENDED_TIMESTAMP:
size = rtmpHeaderSize[this.parserPacket.header.fmt] + this.parserBasicBytes
if (this.parserPacket.header.timestamp === 0xFFFFFF) size += 4
while (this.parserBytes < size && offset < bytes) {
this.parserBuffer[this.parserBytes++] = data[p + offset++]
}
if (this.parserBytes >= size) {
if (this.parserPacket.header.timestamp === 0xFFFFFF) {
extended_timestamp = this.parserBuffer.readUInt32BE(rtmpHeaderSize[this.parserPacket.header.fmt] + this.parserBasicBytes)
}
if (0 === this.parserPacket.bytes) {
if (RTMP_CHUNK_TYPE_0 === this.parserPacket.header.fmt) {
this.parserPacket.clock = 0xFFFFFF === this.parserPacket.header.timestamp ? extended_timestamp : this.parserPacket.header.timestamp
this.parserPacket.delta = 0
} else {
this.parserPacket.delta = 0xFFFFFF === this.parserPacket.header.timestamp ? extended_timestamp : this.parserPacket.header.timestamp
}
this.rtmpPacketAlloc()
}
this.parserState = RTMP_PARSE_PAYLOAD
}
break
case RTMP_PARSE_PAYLOAD:
size = Math.min(this.inChunkSize - (this.parserPacket.bytes % this.inChunkSize), this.parserPacket.header.length - this.parserPacket.bytes)
size = Math.min(size, bytes - offset)
if (size > 0) {
data.copy(this.parserPacket.payload, this.parserPacket.bytes, p + offset, p + offset + size)
}
this.parserPacket.bytes += size
offset += size
if (this.parserPacket.bytes >= this.parserPacket.header.length) {
this.parserState = RTMP_PARSE_INIT
this.parserPacket.bytes = 0
this.parserPacket.clock += this.parserPacket.delta
this.rtmpHandler()
} else if (0 === (this.parserPacket.bytes % this.inChunkSize)) {
this.parserState = RTMP_PARSE_INIT
}
break
}
}
}
rtmpPacketParse() {
let fmt = this.parserBuffer[0] >> 6
let cid = 0
if (this.parserBasicBytes === 2) {
cid = 64 + this.parserBuffer[1]
} else if (this.parserBasicBytes === 3) {
cid = 64 + this.parserBuffer[1] + this.parserBuffer[2] << 8
} else {
cid = this.parserBuffer[0] & 0x3F
}
let hasp = this.inPackets.has(cid)
if (!hasp) {
this.parserPacket = RtmpPacket.create(fmt, cid)
this.inPackets.set(cid, this.parserPacket)
} else {
this.parserPacket = this.inPackets.get(cid)
}
this.parserPacket.header.fmt = fmt
this.parserPacket.header.cid = cid
this.rtmpChunkMessageHeaderRead()
// Logger.log(this.parserPacket)
}
rtmpChunkMessageHeaderRead() {
let offset = this.parserBasicBytes
// timestamp / delta
if (this.parserPacket.header.fmt <= RTMP_CHUNK_TYPE_2) {
this.parserPacket.header.timestamp = this.parserBuffer.readUIntBE(offset, 3)
offset += 3
}
// message length + type
if (this.parserPacket.header.fmt <= RTMP_CHUNK_TYPE_1) {
this.parserPacket.header.length = this.parserBuffer.readUIntBE(offset, 3)
this.parserPacket.header.type = this.parserBuffer[offset + 3]
offset += 4
}
if (this.parserPacket.header.fmt === RTMP_CHUNK_TYPE_0) {
this.parserPacket.header.stream_id = this.parserBuffer.readUInt32LE(offset)
offset += 4
}
return offset
}
rtmpPacketAlloc() {
if (this.parserPacket.capacity < this.parserPacket.header.length) {
this.parserPacket.payload = Buffer.alloc(this.parserPacket.header.length + 1024)
this.parserPacket.capacity = this.parserPacket.header.length + 1024
}
}
rtmpHandler() {
switch (this.parserPacket.header.type) {
case RTMP_TYPE_SET_CHUNK_SIZE:
case RTMP_TYPE_ABORT:
case RTMP_TYPE_ACKNOWLEDGEMENT:
case RTMP_TYPE_WINDOW_ACKNOWLEDGEMENT_SIZE:
case RTMP_TYPE_SET_PEER_BANDWIDTH:
return 0 === this.rtmpControlHandler() ? -1 : 0
case RTMP_TYPE_EVENT:
return 0 === this.rtmpEventHandler() ? -1 : 0
case RTMP_TYPE_AUDIO:
return this.rtmpAudioHandler()
case RTMP_TYPE_VIDEO:
return this.rtmpVideoHandler()
case RTMP_TYPE_FLEX_MESSAGE:
case RTMP_TYPE_INVOKE:
return this.rtmpInvokeHandler()
case RTMP_TYPE_FLEX_STREAM:// AMF3
case RTMP_TYPE_DATA: // AMF0
return this.rtmpDataHandler()
}
}
rtmpControlHandler() {
let payload = this.parserPacket.payload
switch (this.parserPacket.header.type) {
case RTMP_TYPE_SET_CHUNK_SIZE:
this.inChunkSize = payload.readUInt32BE()
// Logger.debug("set inChunkSize", this.inChunkSize)
break
case RTMP_TYPE_ABORT:
break
case RTMP_TYPE_ACKNOWLEDGEMENT:
break
case RTMP_TYPE_WINDOW_ACKNOWLEDGEMENT_SIZE:
this.ackSize = payload.readUInt32BE()
// Logger.debug("set ack Size", this.ackSize)
break
case RTMP_TYPE_SET_PEER_BANDWIDTH:
break
}
}
rtmpEventHandler() {
let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length)
let event = payload.readUInt16BE()
let value = payload.readUInt32BE(2)
// Logger.log("rtmpEventHandler", event, value)
switch (event) {
case 6:
this.rtmpSendPingResponse(value)
break
}
}
rtmpInvokeHandler() {
let offset = this.parserPacket.header.type === RTMP_TYPE_FLEX_MESSAGE ? 1 : 0
let payload = this.parserPacket.payload.slice(offset, this.parserPacket.header.length)
let invokeMessage = AMF.decodeAmf0Cmd(payload)
// Logger.log("rtmpInvokeHandler", invokeMessage)
switch (invokeMessage.cmd) {
case "_result":
this.rtmpCommandOnresult(invokeMessage)
break
case "_error":
this.rtmpCommandOnerror(invokeMessage)
break
case "onStatus":
this.rtmpCommandOnstatus(invokeMessage)
break
}
}
rtmpCommandOnresult(invokeMessage) {
// Logger.debug(invokeMessage)
switch (invokeMessage.transId) {
case RTMP_TRANSACTION_CONNECT:
this.launcher.emit("status", invokeMessage.info)
this.rtmpOnconnect()
break
case RTMP_TRANSACTION_CREATE_STREAM:
this.rtmpOncreateStream(invokeMessage.info)
break
}
}
rtmpCommandOnerror(invokeMessage) {
this.launcher.emit("status", invokeMessage.info)
}
rtmpCommandOnstatus(invokeMessage) {
this.launcher.emit("status", invokeMessage.info)
}
rtmpOnconnect() {
if (this.isPublish) {
this.rtmpSendReleaseStream()
this.rtmpSendFCPublish()
}
this.rtmpSendCreateStream()
}
rtmpOncreateStream(sid) {
this.streamId = sid
if (this.isPublish) {
this.rtmpSendPublish()
this.rtmpSendSetChunkSize()
} else {
this.rtmpSendPlay()
this.rtmpSendSetBufferLength(1000)
}
}
rtmpAudioHandler() {
let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length)
this.launcher.emit("audio", payload, this.parserPacket.clock)
}
rtmpVideoHandler() {
let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length)
this.launcher.emit("video", payload, this.parserPacket.clock)
}
rtmpDataHandler() {
let payload = this.parserPacket.payload.slice(0, this.parserPacket.header.length)
this.launcher.emit("script", payload, this.parserPacket.clock)
}
sendInvokeMessage(sid, opt) {
let packet = RtmpPacket.create()
packet.header.fmt = RTMP_CHUNK_TYPE_0
packet.header.cid = RTMP_CHANNEL_INVOKE
packet.header.type = RTMP_TYPE_INVOKE
packet.header.stream_id = sid
packet.payload = AMF.encodeAmf0Cmd(opt)
packet.header.length = packet.payload.length
let chunks = this.rtmpChunksCreate(packet)
this.socket.write(chunks)
}
rtmpSendConnect() {
let opt = {
cmd: "connect",
transId: RTMP_TRANSACTION_CONNECT,
cmdObj: {
app: this.info.app,
flashVer: FLASHVER,
tcUrl: this.info.tcurl,
fpad: 0,
capabilities: 15,
audioCodecs: 3191,
videoCodecs: 252,
videoFunction: 1,
encoding: 0
}
}
this.sendInvokeMessage(0, opt)
}
rtmpSendReleaseStream() {
let opt = {
cmd: "releaseStream",
transId: 0,
cmdObj: null,
streamName: this.info.stream,
}
this.sendInvokeMessage(this.streamId, opt)
}
rtmpSendFCPublish() {
let opt = {
cmd: "FCPublish",
transId: 0,
cmdObj: null,
streamName: this.info.stream,
}
this.sendInvokeMessage(this.streamId, opt)
}
rtmpSendCreateStream() {
let opt = {
cmd: "createStream",
transId: RTMP_TRANSACTION_CREATE_STREAM,
cmdObj: null
}
this.sendInvokeMessage(0, opt)
}
rtmpSendPlay() {
let opt = {
cmd: "play",
transId: 0,
cmdObj: null,
streamName: this.info.stream,
start: -2,
duration: -1,
reset: 1
}
this.sendInvokeMessage(this.streamId, opt)
}
rtmpSendSetBufferLength(bufferTime) {
let packet = RtmpPacket.create()
packet.header.fmt = RTMP_CHUNK_TYPE_0
packet.header.cid = RTMP_CHANNEL_PROTOCOL
packet.header.type = RTMP_TYPE_EVENT
packet.payload = Buffer.alloc(10)
packet.header.length = packet.payload.length
packet.payload.writeUInt16BE(0x03)
packet.payload.writeUInt32BE(this.streamId, 2)
packet.payload.writeUInt32BE(bufferTime, 6)
let chunks = this.rtmpChunksCreate(packet)
this.socket.write(chunks)
}
rtmpSendPublish() {
let opt = {
cmd: "publish",
transId: 0,
cmdObj: null,
streamName: this.info.stream,
type: "live"
}
this.sendInvokeMessage(this.streamId, opt)
}
rtmpSendSetChunkSize() {
let rtmpBuffer = Buffer.from("02000000000004010000000000000000", "hex")
rtmpBuffer.writeUInt32BE(this.inChunkSize, 12)
this.socket.write(rtmpBuffer)
this.outChunkSize = this.inChunkSize
}
rtmpSendFCUnpublish() {
let opt = {
cmd: "FCUnpublish",
transId: 0,
cmdObj: null,
streamName: this.info.stream,
}
this.sendInvokeMessage(this.streamId, opt)
}
rtmpSendDeleteStream() {
let opt = {
cmd: "deleteStream",
transId: 0,
cmdObj: null,
streamId: this.streamId
}
this.sendInvokeMessage(this.streamId, opt)
}
rtmpSendPingResponse(time) {
let packet = RtmpPacket.create()
packet.header.fmt = RTMP_CHUNK_TYPE_0
packet.header.cid = RTMP_CHANNEL_PROTOCOL
packet.header.type = RTMP_TYPE_EVENT
packet.payload = Buffer.alloc(6)
packet.header.length = packet.payload.length
packet.payload.writeUInt16BE(0x07)
packet.payload.writeUInt32BE(time, 2)
let chunks = this.rtmpChunksCreate(packet)
this.socket.write(chunks)
}
}
module.exports = NodeRtmpClient

View File

@ -0,0 +1,111 @@
//
// Created by Mingliang Chen on 17/8/1.
// illuspas[a]gmail.com
// Copyright (c) 2018 Nodemedia. All rights reserved.
//
const Crypto = require("crypto")
const MESSAGE_FORMAT_0 = 0
const MESSAGE_FORMAT_1 = 1
const MESSAGE_FORMAT_2 = 2
const RTMP_SIG_SIZE = 1536
const SHA256DL = 32
const RandomCrud = Buffer.from([
0xf0, 0xee, 0xc2, 0x4a, 0x80, 0x68, 0xbe, 0xe8,
0x2e, 0x00, 0xd0, 0xd1, 0x02, 0x9e, 0x7e, 0x57,
0x6e, 0xec, 0x5d, 0x2d, 0x29, 0x80, 0x6f, 0xab,
0x93, 0xb8, 0xe6, 0x36, 0xcf, 0xeb, 0x31, 0xae
])
const GenuineFMSConst = "Genuine Adobe Flash Media Server 001"
const GenuineFMSConstCrud = Buffer.concat([Buffer.from(GenuineFMSConst, "utf8"), RandomCrud])
const GenuineFPConst = "Genuine Adobe Flash Player 001"
const GenuineFPConstCrud = Buffer.concat([Buffer.from(GenuineFPConst, "utf8"), RandomCrud])
function calcHmac(data, key) {
let hmac = Crypto.createHmac("sha256", key)
hmac.update(data)
return hmac.digest()
}
function GetClientGenuineConstDigestOffset(buf) {
let offset = buf[0] + buf[1] + buf[2] + buf[3]
offset = (offset % 728) + 12
return offset
}
function GetServerGenuineConstDigestOffset(buf) {
let offset = buf[0] + buf[1] + buf[2] + buf[3]
offset = (offset % 728) + 776
return offset
}
function detectClientMessageFormat(clientsig) {
let computedSignature, msg, providedSignature, sdl
sdl = GetServerGenuineConstDigestOffset(clientsig.slice(772, 776))
msg = Buffer.concat([clientsig.slice(0, sdl), clientsig.slice(sdl + SHA256DL)], 1504)
computedSignature = calcHmac(msg, GenuineFPConst)
providedSignature = clientsig.slice(sdl, sdl + SHA256DL)
if (computedSignature.equals(providedSignature)) {
return MESSAGE_FORMAT_2
}
sdl = GetClientGenuineConstDigestOffset(clientsig.slice(8, 12))
msg = Buffer.concat([clientsig.slice(0, sdl), clientsig.slice(sdl + SHA256DL)], 1504)
computedSignature = calcHmac(msg, GenuineFPConst)
providedSignature = clientsig.slice(sdl, sdl + SHA256DL)
if (computedSignature.equals(providedSignature)) {
return MESSAGE_FORMAT_1
}
return MESSAGE_FORMAT_0
}
function generateS1(messageFormat) {
let randomBytes = Crypto.randomBytes(RTMP_SIG_SIZE - 8)
let handshakeBytes = Buffer.concat([Buffer.from([0, 0, 0, 0, 1, 2, 3, 4]), randomBytes], RTMP_SIG_SIZE)
let serverDigestOffset
if (messageFormat === 1) {
serverDigestOffset = GetClientGenuineConstDigestOffset(handshakeBytes.slice(8, 12))
} else {
serverDigestOffset = GetServerGenuineConstDigestOffset(handshakeBytes.slice(772, 776))
}
let msg = Buffer.concat([handshakeBytes.slice(0, serverDigestOffset), handshakeBytes.slice(serverDigestOffset + SHA256DL)], RTMP_SIG_SIZE - SHA256DL)
let hash = calcHmac(msg, GenuineFMSConst)
hash.copy(handshakeBytes, serverDigestOffset, 0, 32)
return handshakeBytes
}
function generateS2(messageFormat, clientsig, callback) {
let randomBytes = Crypto.randomBytes(RTMP_SIG_SIZE - 32)
let challengeKeyOffset
if (messageFormat === 1) {
challengeKeyOffset = GetClientGenuineConstDigestOffset(clientsig.slice(8, 12))
} else {
challengeKeyOffset = GetServerGenuineConstDigestOffset(clientsig.slice(772, 776))
}
let challengeKey = clientsig.slice(challengeKeyOffset, challengeKeyOffset + 32)
let hash = calcHmac(challengeKey, GenuineFMSConstCrud)
let signature = calcHmac(randomBytes, hash)
let s2Bytes = Buffer.concat([randomBytes, signature], RTMP_SIG_SIZE)
return s2Bytes
}
function generateS0S1S2(clientsig) {
let clientType = Buffer.alloc(1, 3)
let messageFormat = detectClientMessageFormat(clientsig)
let allBytes
if (messageFormat === MESSAGE_FORMAT_0) {
// Logger.debug("[rtmp handshake] using simple handshake.")
allBytes = Buffer.concat([clientType, clientsig, clientsig])
} else {
// Logger.debug("[rtmp handshake] using complex handshake.")
allBytes = Buffer.concat([clientType, generateS1(messageFormat), generateS2(messageFormat, clientsig)])
}
return allBytes
}
module.exports = { generateS0S1S2 }

View File

@ -0,0 +1,103 @@
const fs = require("fs")
const loadash = require("lodash")
const mkdirp = require("mkdirp")
const Logger = require("../lib/logger")
const FissionSession = require("../sessionsModels/fission_session")
const { getFFmpegVersion, getFFmpegUrl } = require("../lib/utils")
const context = require("../ctx")
class NodeFissionServer {
constructor(config) {
this.config = config
this.fissionSessions = new Map()
}
async run() {
try {
mkdirp.sync(this.config.mediaroot)
fs.accessSync(this.config.mediaroot, fs.constants.W_OK)
} catch (error) {
Logger.error(`Node Media Fission Server startup failed. MediaRoot:${this.config.mediaroot} cannot be written.`)
return
}
try {
fs.accessSync(this.config.fission.ffmpeg, fs.constants.X_OK)
} catch (error) {
Logger.error(`Node Media Fission Server startup failed. ffmpeg:${this.config.fission.ffmpeg} cannot be executed.`)
return
}
let version = await getFFmpegVersion(this.config.fission.ffmpeg)
if (version === "" || parseInt(version.split(".")[0]) < 4) {
Logger.error("Node Media Fission Server startup failed. ffmpeg requires version 4.0.0 above")
Logger.error("Download the latest ffmpeg static program:", getFFmpegUrl())
return
}
context.nodeEvent.on("postPublish", this.onPostPublish.bind(this))
context.nodeEvent.on("donePublish", this.onDonePublish.bind(this))
Logger.log(`Node Media Fission Server started, MediaRoot: ${this.config.mediaroot}, ffmpeg version: ${version}`)
}
async onPostPublish(id, streamPath, args) {
const fixedStreamingKey = streamPath.split("/").pop()
const userspace = await global.resolveUserspaceOfStreamingKey(fixedStreamingKey)
if (!userspace) {
console.error("No userspace found for streaming key:", fixedStreamingKey)
return false
}
let regRes = /\/(.*)\/(.*)/gi.exec(streamPath)
let [app, name] = loadash.slice(regRes, 1)
for (let task of this.config.fission.tasks) {
regRes = /(.*)\/(.*)/gi.exec(task.rule)
let [ruleApp, ruleName] = loadash.slice(regRes, 1)
if ((app === ruleApp || ruleApp === "*") && (name === ruleName || ruleName === "*")) {
let s = context.sessions.get(id)
if (s.isLocal && name.split("_")[1]) {
continue
}
let conf = task
conf.ffmpeg = this.config.fission.ffmpeg
conf.mediaroot = this.config.mediaroot
conf.rtmpPort = this.config.rtmp.port
conf.streamPath = streamPath
conf.streamApp = app
conf.streamName = name
conf.fixedStreamName = userspace.username
conf.args = args
let session = new FissionSession(conf)
this.fissionSessions.set(id, session)
session.on("end", () => {
this.fissionSessions.delete(id)
})
session.run()
}
}
}
onDonePublish(id, streamPath, args) {
let session = this.fissionSessions.get(id)
if (session) {
session.end()
}
}
}
module.exports = NodeFissionServer

View File

@ -0,0 +1,255 @@
const fs = require("fs")
const _ = require("lodash")
const querystring = require("querystring")
const { getFFmpegVersion, getFFmpegUrl } = require("../lib/utils")
const NodeCoreUtils = require("../lib/utils")
const Logger = require("../lib/logger")
const NodeRelaySession = require("../sessionsModels/relay_session")
const context = require("../ctx")
class NodeRelayServer {
constructor(config) {
this.config = config
this.staticCycle = null
this.staticSessions = new Map()
this.dynamicSessions = new Map()
}
async run() {
try {
fs.accessSync(this.config.relay.ffmpeg, fs.constants.X_OK)
} catch (error) {
Logger.error(`Node Media Relay Server startup failed. ffmpeg:${this.config.relay.ffmpeg} cannot be executed.`)
return
}
let version = await getFFmpegVersion(this.config.relay.ffmpeg)
if (version === "" || parseInt(version.split(".")[0]) < 4) {
Logger.error("Node Media Relay Server startup failed. ffmpeg requires version 4.0.0 above")
Logger.error("Download the latest ffmpeg static program:", getFFmpegUrl())
return
}
context.nodeEvent.on("relayPull", this.onRelayPull.bind(this))
context.nodeEvent.on("relayPush", this.onRelayPush.bind(this))
context.nodeEvent.on("prePlay", this.onPrePlay.bind(this))
context.nodeEvent.on("donePlay", this.onDonePlay.bind(this))
context.nodeEvent.on("postPublish", this.onPostPublish.bind(this))
context.nodeEvent.on("donePublish", this.onDonePublish.bind(this))
this.staticCycle = setInterval(this.onStatic.bind(this), 1000)
Logger.log("Node Media Relay Server started")
}
onStatic() {
if (!this.config.relay.tasks) {
return
}
let i = this.config.relay.tasks.length
while (i--) {
if (this.staticSessions.has(i)) {
continue
}
let conf = this.config.relay.tasks[i]
let isStatic = conf.mode === "static"
if (isStatic) {
conf.name = conf.name ? conf.name : NodeCoreUtils.genRandomName()
conf.ffmpeg = this.config.relay.ffmpeg
conf.inPath = conf.edge
conf.ouPath = `rtmp://127.0.0.1:${this.config.rtmp.port}/${conf.app}/${conf.name}`
let session = new NodeRelaySession(conf)
session.id = i
session.streamPath = `/${conf.app}/${conf.name}`
session.on("end", (id) => {
this.staticSessions.delete(id)
})
this.staticSessions.set(i, session)
session.run()
Logger.log("[relay static pull] start", i, conf.inPath, "to", conf.ouPath)
}
}
}
//从远端拉推到本地
onRelayPull(url, app, name) {
let conf = {}
conf.app = app
conf.name = name
conf.ffmpeg = this.config.relay.ffmpeg
conf.inPath = url
conf.ouPath = `rtmp://127.0.0.1:${this.config.rtmp.port}/${app}/${name}`
let session = new NodeRelaySession(conf)
const id = session.id
context.sessions.set(id, session)
session.on("end", (id) => {
this.dynamicSessions.delete(id)
})
this.dynamicSessions.set(id, session)
session.run()
Logger.log("[relay dynamic pull] start id=" + id, conf.inPath, "to", conf.ouPath)
return id
}
//从本地拉推到远端
onRelayPush(url, app, name) {
let conf = {}
conf.app = app
conf.name = name
conf.ffmpeg = this.config.relay.ffmpeg
conf.inPath = `rtmp://127.0.0.1:${this.config.rtmp.port}/${app}/${name}`
conf.ouPath = url
let session = new NodeRelaySession(conf)
const id = session.id
context.sessions.set(id, session)
session.on("end", (id) => {
this.dynamicSessions.delete(id)
})
this.dynamicSessions.set(id, session)
session.run()
Logger.log("[relay dynamic push] start id=" + id, conf.inPath, "to", conf.ouPath)
}
onPrePlay(id, streamPath, args) {
if (!this.config.relay.tasks) {
return
}
let regRes = /\/(.*)\/(.*)/gi.exec(streamPath)
let [app, stream] = _.slice(regRes, 1)
let i = this.config.relay.tasks.length
while (i--) {
let conf = this.config.relay.tasks[i]
let isPull = conf.mode === "pull"
if (isPull && app === conf.app && !context.publishers.has(streamPath)) {
let hasApp = conf.edge.match(/rtmp:\/\/([^\/]+)\/([^\/]+)/)
conf.ffmpeg = this.config.relay.ffmpeg
conf.inPath = hasApp ? `${conf.edge}/${stream}` : `${conf.edge}${streamPath}`
conf.ouPath = `rtmp://127.0.0.1:${this.config.rtmp.port}${streamPath}`
if (Object.keys(args).length > 0) {
conf.inPath += "?"
conf.inPath += querystring.encode(args)
}
let session = new NodeRelaySession(conf)
session.id = id
session.on("end", (id) => {
this.dynamicSessions.delete(id)
})
this.dynamicSessions.set(id, session)
session.run()
Logger.log("[relay dynamic pull] start id=" + id, conf.inPath, "to", conf.ouPath)
}
}
}
onDonePlay(id, streamPath, args) {
let session = this.dynamicSessions.get(id)
let publisher = context.sessions.get(context.publishers.get(streamPath))
if (session && publisher.players.size == 0) {
session.end()
}
}
onPostPublish(id, streamPath, args) {
if (!this.config.relay.tasks) {
return
}
let regRes = /\/(.*)\/(.*)/gi.exec(streamPath)
let [app, stream] = _.slice(regRes, 1)
let i = this.config.relay.tasks.length
while (i--) {
let conf = this.config.relay.tasks[i]
let isPush = conf.mode === "push"
if (isPush && app === conf.app) {
let hasApp = conf.edge.match(/rtmp:\/\/([^\/]+)\/([^\/]+)/)
conf.ffmpeg = this.config.relay.ffmpeg
conf.inPath = `rtmp://127.0.0.1:${this.config.rtmp.port}${streamPath}`
conf.ouPath = conf.appendName === false ? conf.edge : (hasApp ? `${conf.edge}/${stream}` : `${conf.edge}${streamPath}`)
if (Object.keys(args).length > 0) {
conf.ouPath += "?"
conf.ouPath += querystring.encode(args)
}
let session = new NodeRelaySession(conf)
session.id = id
session.on("end", (id) => {
this.dynamicSessions.delete(id)
})
this.dynamicSessions.set(id, session)
session.run()
Logger.log("[relay dynamic push] start id=" + id, conf.inPath, "to", conf.ouPath)
}
}
}
onDonePublish(id, streamPath, args) {
let session = this.dynamicSessions.get(id)
if (session) {
session.end()
}
for (session of this.staticSessions.values()) {
if (session.streamPath === streamPath) {
session.end()
}
}
}
stop() {
clearInterval(this.staticCycle)
}
}
module.exports = NodeRelayServer

View File

@ -0,0 +1,87 @@
//
// Created by Mingliang Chen on 17/8/1.
// illuspas[a]gmail.com
// Copyright (c) 2018 Nodemedia. All rights reserved.
//
const Tls = require("tls")
const Fs = require("fs")
const Net = require("net")
const NodeRtmpSession = require("../sessionsModels/rtmp_session")
const Logger = require("../lib/logger")
const context = require("../ctx")
const RTMP_PORT = 1935
const RTMPS_PORT = 443
class NodeRtmpServer {
constructor(config) {
config.rtmp.port = this.port = config.rtmp.port ? config.rtmp.port : RTMP_PORT
this.tcpServer = Net.createServer((socket) => {
let session = new NodeRtmpSession(config, socket)
session.run()
})
if (config.rtmp.ssl){
config.rtmp.ssl.port = this.sslPort = config.rtmp.ssl.port ? config.rtmp.ssl.port : RTMPS_PORT
try {
const options = {
key: Fs.readFileSync(config.rtmp.ssl.key),
cert: Fs.readFileSync(config.rtmp.ssl.cert)
}
this.tlsServer = Tls.createServer(options, (socket) => {
let session = new NodeRtmpSession(config, socket)
session.run()
})
} catch (e) {
Logger.error(`Node Media Rtmps Server error while reading ssl certs: <${e}>`)
}
}
}
run() {
this.tcpServer.listen(this.port, () => {
Logger.log(`Node Media Rtmp Server started on port: ${this.port}`)
})
this.tcpServer.on("error", (e) => {
Logger.error(`Node Media Rtmp Server ${e}`)
})
this.tcpServer.on("close", () => {
Logger.log("Node Media Rtmp Server Close.")
})
if (this.tlsServer) {
this.tlsServer.listen(this.sslPort, () => {
Logger.log(`Node Media Rtmps Server started on port: ${this.sslPort}`)
})
this.tlsServer.on("error", (e) => {
Logger.error(`Node Media Rtmps Server ${e}`)
})
this.tlsServer.on("close", () => {
Logger.log("Node Media Rtmps Server Close.")
})
}
}
stop() {
this.tcpServer.close()
if (this.tlsServer) {
this.tlsServer.close()
}
context.sessions.forEach((session, id) => {
if (session instanceof NodeRtmpSession)
session.stop()
})
}
}
module.exports = NodeRtmpServer

View File

@ -0,0 +1,105 @@
const fs = require("fs")
const lodash = require("lodash")
const mkdirp = require("mkdirp")
const Logger = require("../lib/logger")
const TransSession = require("../sessionsModels/trans_session")
const { getFFmpegVersion, getFFmpegUrl } = require("../lib/utils")
const context = require("../ctx")
class NodeTransServer {
constructor(config) {
this.config = config
this.transSessions = new Map()
}
async run() {
try {
mkdirp.sync(this.config.mediaroot)
fs.accessSync(this.config.mediaroot, fs.constants.W_OK)
} catch (error) {
Logger.error(`Node Media Trans Server startup failed. MediaRoot:${this.config.mediaroot} cannot be written.`)
return
}
try {
fs.accessSync(this.config.trans.ffmpeg, fs.constants.X_OK)
} catch (error) {
Logger.error(`Node Media Trans Server startup failed. ffmpeg:${this.config.trans.ffmpeg} cannot be executed.`)
return
}
let version = await getFFmpegVersion(this.config.trans.ffmpeg)
if (version === "" || parseInt(version.split(".")[0]) < 4) {
Logger.error("Node Media Trans Server startup failed. ffmpeg requires version 4.0.0 above")
Logger.error("Download the latest ffmpeg static program:", getFFmpegUrl())
return
}
let i = this.config.trans.tasks.length
let apps = ""
while (i--) {
apps += this.config.trans.tasks[i].app
apps += " "
}
context.nodeEvent.on("postPublish", this.onPostPublish.bind(this))
context.nodeEvent.on("donePublish", this.onDonePublish.bind(this))
Logger.log(`Node Media Trans Server started for apps: [ ${apps}] , MediaRoot: ${this.config.mediaroot}, ffmpeg version: ${version}`)
}
async onPostPublish(id, streamPath, args) {
const fixedStreamingKey = streamPath.split("/").pop()
const userspace = await global.resolveUserspaceOfStreamingKey(fixedStreamingKey)
if (!userspace) {
console.error("No userspace found for streaming key:", fixedStreamingKey)
return false
}
let regRes = /\/(.*)\/(.*)/gi.exec(streamPath)
let [app, name] = lodash.slice(regRes, 1)
let i = this.config.trans.tasks.length
while (i--) {
let conf = { ...this.config.trans.tasks[i] }
conf.ffmpeg = this.config.trans.ffmpeg
conf.mediaroot = this.config.mediaroot
conf.rtmpPort = this.config.rtmp.port
conf.streamPath = streamPath
conf.streamApp = app
conf.streamName = name
conf.fixedStreamName = userspace.username
conf.args = args
if (app === conf.app) {
let session = new TransSession(conf)
this.transSessions.set(id, session)
session.on("end", () => {
this.transSessions.delete(id)
})
session.run()
}
}
}
onDonePublish(id, streamPath, args) {
let session = this.transSessions.get(id)
if (session) {
session.end()
}
}
}
module.exports = NodeTransServer

View File

@ -0,0 +1,51 @@
const EventEmitter = require("events")
const { spawn } = require("child_process")
const Logger = require("../lib/logger")
class NodeFissionSession extends EventEmitter {
constructor(conf) {
super()
this.conf = conf
}
run() {
let inPath = "rtmp://127.0.0.1:" + this.conf.rtmpPort + this.conf.streamPath
let argv = ["-i", inPath]
for (let m of this.conf.model) {
let x264 = ["-c:v", "libx264", "-preset", "veryfast", "-tune", "zerolatency", "-maxrate", m.vb, "-bufsize", m.vb, "-g", parseInt(m.vf) * 2, "-r", m.vf, "-s", m.vs]
let aac = ["-c:a", "aac", "-b:a", m.ab]
let outPath = ["-f", "flv", "rtmp://127.0.0.1:" + this.conf.rtmpPort + "/" + this.conf.streamApp + "/" + this.conf.fixedStreamName + "_" + m.vs.split("x")[1]]
argv.splice(argv.length, 0, ...x264)
argv.splice(argv.length, 0, ...aac)
argv.splice(argv.length, 0, ...outPath)
}
argv = argv.filter((n) => { return n })
this.ffmpeg_exec = spawn(this.conf.ffmpeg, argv)
this.ffmpeg_exec.on("error", (e) => {
Logger.ffdebug(e)
})
this.ffmpeg_exec.stdout.on("data", (data) => {
Logger.ffdebug(`FF输出${data}`)
})
this.ffmpeg_exec.stderr.on("data", (data) => {
Logger.ffdebug(`FF输出${data}`)
})
this.ffmpeg_exec.on("close", (code) => {
Logger.log("[Fission end] " + this.conf.streamPath)
this.emit("end")
})
}
end() {
this.ffmpeg_exec.kill()
}
}
module.exports = NodeFissionSession

View File

@ -0,0 +1,218 @@
const URL = require("url")
const Logger = require("../lib/logger")
const Utils = require("../lib/utils")
const context = require("../ctx")
const FlvPacket = {
create: (payload = null, type = 0, time = 0) => {
return {
header: {
length: payload ? payload.length : 0,
timestamp: time,
type: type
},
payload: payload
}
}
}
class FlvSession {
constructor(req, res) {
this.req = req
this.res = res
this.id = Utils.generateNewSessionID()
this.ip = this.req.socket.remoteAddress
this.playStreamPath = ""
this.playArgs = null
this.isStarting = false
this.isPlaying = false
this.isIdling = false
if (this.req.nmsConnectionType === "ws") {
this.res.cork = this.res._socket.cork.bind(this.res._socket)
this.res.uncork = this.res._socket.uncork.bind(this.res._socket)
this.res.on("close", this.onReqClose.bind(this))
this.res.on("error", this.onReqError.bind(this))
this.res.write = this.res.send
this.res.end = this.res.close
this.TAG = "websocket-flv"
} else {
this.res.cork = this.res.socket.cork.bind(this.res.socket)
this.res.uncork = this.res.socket.uncork.bind(this.res.socket)
this.req.socket.on("close", this.onReqClose.bind(this))
this.req.on("error", this.onReqError.bind(this))
this.TAG = "http-flv"
}
this.numPlayCache = 0
context.sessions.set(this.id, this)
}
run() {
let method = this.req.method
let urlInfo = URL.parse(this.req.url, true)
let streamPath = urlInfo.pathname.split(".")[0]
this.connectCmdObj = { ip: this.ip, method, streamPath, query: urlInfo.query }
this.connectTime = new Date()
this.isStarting = true
Logger.log(`[${this.TAG} connect] id=${this.id} ip=${this.ip} args=${JSON.stringify(urlInfo.query)}`)
context.nodeEvent.emit("preConnect", this.id, this.connectCmdObj)
if (!this.isStarting) {
this.stop()
return
}
context.nodeEvent.emit("postConnect", this.id, this.connectCmdObj)
if (method === "GET") {
this.playStreamPath = streamPath
this.playArgs = urlInfo.query
this.onPlay()
} else {
this.stop()
}
}
stop() {
if (this.isStarting) {
this.isStarting = false
let publisherId = context.publishers.get(this.playStreamPath)
if (publisherId != null) {
context.sessions.get(publisherId).players.delete(this.id)
context.nodeEvent.emit("donePlay", this.id, this.playStreamPath, this.playArgs)
}
Logger.log(`[${this.TAG} play] Close stream. id=${this.id} streamPath=${this.playStreamPath}`)
Logger.log(`[${this.TAG} disconnect] id=${this.id}`)
context.nodeEvent.emit("doneConnect", this.id, this.connectCmdObj)
this.res.end()
context.idlePlayers.delete(this.id)
context.sessions.delete(this.id)
}
}
onReqClose() {
this.stop()
}
onReqError(e) {
this.stop()
}
reject() {
Logger.log(`[${this.TAG} reject] id=${this.id}`)
this.stop()
}
onPlay() {
context.nodeEvent.emit("prePlay", this.id, this.playStreamPath, this.playArgs)
if (!this.isStarting) {
return
}
if (!context.publishers.has(this.playStreamPath)) {
Logger.log(`[${this.TAG} play] Stream not found. id=${this.id} streamPath=${this.playStreamPath} `)
context.idlePlayers.add(this.id)
this.isIdling = true
return
}
this.onStartPlay()
}
onStartPlay() {
let publisherId = context.publishers.get(this.playStreamPath)
let publisher = context.sessions.get(publisherId)
let players = publisher.players
players.add(this.id)
//send FLV header
let FLVHeader = Buffer.from([0x46, 0x4c, 0x56, 0x01, 0x00, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x00])
if (publisher.isFirstAudioReceived) {
FLVHeader[4] |= 0b00000100
}
if (publisher.isFirstVideoReceived) {
FLVHeader[4] |= 0b00000001
}
this.res.write(FLVHeader)
//send Metadata
if (publisher.metaData != null) {
let packet = FlvPacket.create(publisher.metaData, 18)
let tag = FlvSession.createFlvTag(packet)
this.res.write(tag)
}
//send aacSequenceHeader
if (publisher.audioCodec == 10) {
let packet = FlvPacket.create(publisher.aacSequenceHeader, 8)
let tag = FlvSession.createFlvTag(packet)
this.res.write(tag)
}
//send avcSequenceHeader
if (publisher.videoCodec == 7 || publisher.videoCodec == 12) {
let packet = FlvPacket.create(publisher.avcSequenceHeader, 9)
let tag = FlvSession.createFlvTag(packet)
this.res.write(tag)
}
//send gop cache
if (publisher.flvGopCacheQueue != null) {
for (let tag of publisher.flvGopCacheQueue) {
this.res.write(tag)
}
}
this.isIdling = false
this.isPlaying = true
Logger.log(`[${this.TAG} play] Join stream. id=${this.id} streamPath=${this.playStreamPath} `)
context.nodeEvent.emit("postPlay", this.id, this.playStreamPath, this.playArgs)
}
static createFlvTag(packet) {
let PreviousTagSize = 11 + packet.header.length
let tagBuffer = Buffer.alloc(PreviousTagSize + 4)
tagBuffer[0] = packet.header.type
tagBuffer.writeUIntBE(packet.header.length, 1, 3)
tagBuffer[4] = (packet.header.timestamp >> 16) & 0xff
tagBuffer[5] = (packet.header.timestamp >> 8) & 0xff
tagBuffer[6] = packet.header.timestamp & 0xff
tagBuffer[7] = (packet.header.timestamp >> 24) & 0xff
tagBuffer.writeUIntBE(0, 8, 3)
tagBuffer.writeUInt32BE(PreviousTagSize, PreviousTagSize)
packet.payload.copy(tagBuffer, 11, 0, packet.header.length)
return tagBuffer
}
}
module.exports = FlvSession

View File

@ -0,0 +1,60 @@
const EventEmitter = require("events")
const { spawn } = require("child_process")
const Logger = require("../lib/logger")
const NodeCoreUtils = require("../lib/utils")
const RTSP_TRANSPORT = ["udp", "tcp", "udp_multicast", "http"]
class NodeRelaySession extends EventEmitter {
constructor(conf) {
super()
this.conf = conf
this.id = NodeCoreUtils.generateNewSessionID()
this.TAG = "relay"
}
run() {
let format = this.conf.ouPath.startsWith("rtsp://") ? "rtsp" : "flv"
let argv = ["-re", "-i", this.conf.inPath, "-c", "copy", "-f", format, this.conf.ouPath]
if (this.conf.inPath[0] === "/" || this.conf.inPath[1] === ":") {
argv.unshift("-1")
argv.unshift("-stream_loop")
}
if (this.conf.inPath.startsWith("rtsp://") && this.conf.rtsp_transport) {
if (RTSP_TRANSPORT.indexOf(this.conf.rtsp_transport) > -1) {
argv.unshift(this.conf.rtsp_transport)
argv.unshift("-rtsp_transport")
}
}
Logger.log("[relay task] id=" + this.id, "cmd=ffmpeg", argv.join(" "))
this.ffmpeg_exec = spawn(this.conf.ffmpeg, argv)
this.ffmpeg_exec.on("error", (e) => {
Logger.ffdebug(e)
})
this.ffmpeg_exec.stdout.on("data", (data) => {
Logger.ffdebug(`FF输出${data}`)
})
this.ffmpeg_exec.stderr.on("data", (data) => {
Logger.ffdebug(`FF输出${data}`)
})
this.ffmpeg_exec.on("close", (code) => {
Logger.log("[relay end] id=" + this.id, "code=" + code)
this.emit("end", this.id)
})
}
end() {
this.ffmpeg_exec.kill()
}
}
module.exports = NodeRelaySession

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,114 @@
//
// Created by Mingliang Chen on 18/3/9.
// illuspas[a]gmail.com
// Copyright (c) 2018 Nodemedia. All rights reserved.
//
const fs = require("fs")
const EventEmitter = require("events")
const { spawn } = require("child_process")
const dateFormat = require("dateformat")
const mkdirp = require("mkdirp")
const Logger = require("../lib/logger")
class NodeTransSession extends EventEmitter {
constructor(conf) {
super()
this.conf = conf
}
run() {
let vc = this.conf.vc || "copy"
let ac = this.conf.ac || "copy"
let inPath = "rtmp://127.0.0.1:" + this.conf.rtmpPort + this.conf.streamPath
let ouPath = `${this.conf.mediaroot}/${this.conf.streamApp}/${this.conf.fixedStreamName}`
let mapStr = ""
if (this.conf.rtmp && this.conf.rtmpApp) {
if (this.conf.rtmpApp === this.conf.streamApp) {
Logger.error("[Transmuxing RTMP] Cannot output to the same app.")
} else {
let rtmpOutput = `rtmp://127.0.0.1:${this.conf.rtmpPort}/${this.conf.rtmpApp}/${this.conf.streamName}`
mapStr += `[f=flv]${rtmpOutput}|`
Logger.log("[Transmuxing RTMP] " + this.conf.streamPath + " to " + rtmpOutput)
}
}
if (this.conf.mp4) {
this.conf.mp4Flags = this.conf.mp4Flags ? this.conf.mp4Flags : ""
let mp4FileName = dateFormat("yyyy-mm-dd-HH-MM-ss") + ".mp4"
let mapMp4 = `${this.conf.mp4Flags}${ouPath}/${mp4FileName}|`
mapStr += mapMp4
Logger.log("[Transmuxing MP4] " + this.conf.streamPath + " to " + ouPath + "/" + mp4FileName)
}
if (this.conf.hls) {
this.conf.hlsFlags = this.conf.hlsFlags ? this.conf.hlsFlags : ""
let hlsFileName = "index.m3u8"
let mapHls = `${this.conf.hlsFlags}${ouPath}/${hlsFileName}|`
mapStr += mapHls
Logger.log("[Transmuxing HLS] " + this.conf.streamPath + " to " + ouPath + "/" + hlsFileName)
}
if (this.conf.dash) {
this.conf.dashFlags = this.conf.dashFlags ? this.conf.dashFlags : ""
let dashFileName = "index.mpd"
let mapDash = `${this.conf.dashFlags}${ouPath}/${dashFileName}`
mapStr += mapDash
Logger.log("[Transmuxing DASH] " + this.conf.streamPath + " to " + ouPath + "/" + dashFileName)
}
mkdirp.sync(ouPath)
let argv = ["-y", "-i", inPath]
Array.prototype.push.apply(argv, ["-c:v", vc])
Array.prototype.push.apply(argv, this.conf.vcParam)
Array.prototype.push.apply(argv, ["-c:a", ac])
Array.prototype.push.apply(argv, this.conf.acParam)
Array.prototype.push.apply(argv, ["-f", "tee", "-map", "0:a?", "-map", "0:v?", mapStr])
argv = argv.filter((n) => { return n }) //去空
this.ffmpeg_exec = spawn(this.conf.ffmpeg, argv)
this.ffmpeg_exec.on("error", (e) => {
Logger.ffdebug(e)
})
this.ffmpeg_exec.stdout.on("data", (data) => {
Logger.ffdebug(`FF输出${data}`)
})
this.ffmpeg_exec.stderr.on("data", (data) => {
Logger.ffdebug(`FF输出${data}`)
})
this.ffmpeg_exec.on("close", (code) => {
Logger.log("[Transmuxing end] " + this.conf.streamPath)
this.emit("end")
fs.readdir(ouPath, function (err, files) {
if (!err) {
files.forEach((filename) => {
if (filename.endsWith(".ts")
|| filename.endsWith(".m3u8")
|| filename.endsWith(".mpd")
|| filename.endsWith(".m4s")
|| filename.endsWith(".tmp")) {
fs.unlinkSync(ouPath + "/" + filename)
}
})
}
})
})
}
end() {
this.ffmpeg_exec.kill()
}
}
module.exports = NodeTransSession

@ -1 +0,0 @@
Subproject commit 219ca691fda874c94712e4ba86a023948940409d