forked from iizukanao/node-rtsp-rtmp-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream_server.coffee
250 lines (205 loc) · 8.1 KB
/
stream_server.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# RTSP and RTMP/RTMPE/RTMPT/RTMPTE server implementation.
# Also serves HTTP contents as this server is meant to
# be run on port 80.
net = require 'net'
fs = require 'fs'
crypto = require 'crypto'
config = require './config'
rtmp = require './rtmp'
http = require './http'
rtsp = require './rtsp'
h264 = require './h264'
aac = require './aac'
avstreams = require './avstreams'
CustomReceiver = require './custom_receiver'
logger = require './logger'
packageJson = require './package.json'
Sequent = require 'sequent'
# If true, incoming video/audio packets are printed to the console
DEBUG_INCOMING_PACKET_DATA = false
# If true, hash value of each incoming video/audio access unit is printed to the console
DEBUG_INCOMING_PACKET_HASH = false
## Default server name for RTSP and HTTP responses
DEFAULT_SERVER_NAME = "node-rtsp-rtmp-server/#{packageJson.version}"
serverName = config.serverName ? DEFAULT_SERVER_NAME
class StreamServer
constructor: (opts) ->
@serverName = opts?.serverName ? serverName
# Create RTMP server
@rtmpServer = new rtmp.RTMPServer
@rtmpServer.on 'video_start', (streamId) =>
stream = avstreams.getOrCreate streamId
@onReceiveVideoControlBuffer stream
@rtmpServer.on 'video_data', (streamId, pts, dts, nalUnits) =>
stream = avstreams.get streamId
if stream?
@onReceiveVideoPacket stream, nalUnits, pts, dts
else
logger.warn "warn: Received invalid streamId from rtmp: #{streamId}"
@rtmpServer.on 'audio_start', (streamId) =>
stream = avstreams.getOrCreate streamId
@onReceiveAudioControlBuffer stream
@rtmpServer.on 'audio_data', (streamId, pts, dts, adtsFrame) =>
stream = avstreams.get streamId
if stream?
@onReceiveAudioPacket stream, adtsFrame, pts, dts
else
logger.warn "warn: Received invalid streamId from rtmp: #{streamId}"
# Setup data receivers for custom protocol
@customReceiver = new CustomReceiver config.receiverType,
videoControl: =>
@onReceiveVideoControlBuffer arguments...
audioControl: =>
@onReceiveAudioControlBuffer arguments...
videoData: =>
@onReceiveVideoDataBuffer arguments...
audioData: =>
@onReceiveAudioDataBuffer arguments...
# Delete old sockets
@customReceiver.deleteReceiverSocketsSync()
@httpHandler = new http.HTTPHandler
serverName: @serverName
documentRoot: opts?.documentRoot
@rtspServer = new rtsp.RTSPServer
serverName : @serverName
httpHandler: @httpHandler
rtmpServer : @rtmpServer
@rtspServer.on 'video_start', (stream) =>
@onReceiveVideoControlBuffer stream
@rtspServer.on 'audio_start', (stream) =>
@onReceiveAudioControlBuffer stream
@rtspServer.on 'video', (stream, nalUnits, pts, dts) =>
@onReceiveVideoNALUnits stream, nalUnits, pts, dts
@rtspServer.on 'audio', (stream, accessUnits, pts, dts) =>
@onReceiveAudioAccessUnits stream, accessUnits, pts, dts
avstreams.on 'new', (stream) ->
if DEBUG_INCOMING_PACKET_HASH
stream.lastSentVideoTimestamp = 0
avstreams.on 'reset', (stream) ->
if DEBUG_INCOMING_PACKET_HASH
stream.lastSentVideoTimestamp = 0
## TODO: Do we need to do something for remove_stream event?
#avstreams.on 'remove_stream', (stream) ->
# logger.raw "received remove_stream event from stream #{stream.id}"
stop: (callback) ->
@customReceiver.deleteReceiverSocketsSync()
callback?()
start: (callback) ->
seq = new Sequent
@rtmpServer.start ->
seq.done()
# RTMP server is ready
# Start data receivers for custom protocol
@customReceiver.start()
@rtspServer.start { port: config.serverPort }, ->
seq.done()
seq.wait 2, ->
callback?()
setLivePathConsumer: (func) ->
@rtspServer.setLivePathConsumer func
# buf argument can be null (not used)
onReceiveVideoControlBuffer: (stream, buf) ->
logger.debug "video start"
stream.resetFrameRate stream
stream.isVideoStarted = true
stream.timeAtVideoStart = Date.now()
stream.timeAtAudioStart = stream.timeAtVideoStart
# stream.spropParameterSets = ''
# buf argument can be null (not used)
onReceiveAudioControlBuffer: (stream, buf) ->
logger.debug "audio start"
stream.isAudioStarted = true
stream.timeAtAudioStart = Date.now()
stream.timeAtVideoStart = stream.timeAtAudioStart
onReceiveVideoDataBuffer: (stream, buf) ->
pts = buf[1] * 0x010000000000 + \
buf[2] * 0x0100000000 + \
buf[3] * 0x01000000 + \
buf[4] * 0x010000 + \
buf[5] * 0x0100 + \
buf[6]
# TODO: Support dts
dts = pts
nalUnit = buf[7..]
@onReceiveVideoPacket stream, nalUnit, pts, dts
onReceiveAudioDataBuffer: (stream, buf) ->
pts = buf[1] * 0x010000000000 + \
buf[2] * 0x0100000000 + \
buf[3] * 0x01000000 + \
buf[4] * 0x010000 + \
buf[5] * 0x0100 + \
buf[6]
# TODO: Support dts
dts = pts
adtsFrame = buf[7..]
@onReceiveAudioPacket stream, adtsFrame, pts, dts
# nal_unit_type 5 must not separated with 7 and 8 which
# share the same timestamp as 5
onReceiveVideoNALUnits: (stream, nalUnits, pts, dts) ->
if DEBUG_INCOMING_PACKET_DATA
logger.info "receive video: num_nal_units=#{nalUnits.length} pts=#{pts}"
# rtspServer will parse nalUnits and updates SPS/PPS for the stream,
# so we don't need to parse them here.
# TODO: Should SPS/PPS be parsed here?
@rtspServer.sendVideoData stream, nalUnits, pts, dts
@rtmpServer.sendVideoPacket stream, nalUnits, pts, dts
hasVideoFrame = false
for nalUnit in nalUnits
nalUnitType = h264.getNALUnitType nalUnit
if (nalUnitType is h264.NAL_UNIT_TYPE_IDR_PICTURE) or (nalUnitType is h264.NAL_UNIT_TYPE_NON_IDR_PICTURE) # 5 (key frame) or 1 (inter frame)
hasVideoFrame = true
if not DEBUG_INCOMING_PACKET_HASH
break
if DEBUG_INCOMING_PACKET_HASH
md5 = crypto.createHash 'md5'
md5.update nalUnit
tsDiff = pts - stream.lastSentVideoTimestamp
logger.info "video: pts=#{pts} pts_diff=#{tsDiff} md5=#{md5.digest('hex')[0..6]} nal_unit_type=#{nalUnitType} bytes=#{nalUnit.length}"
stream.lastSentVideoTimestamp = pts
if hasVideoFrame
stream.calcFrameRate pts
return
# Takes H.264 NAL units separated by start code (0x000001)
#
# arguments:
# nalUnit: Buffer
# pts: timestamp in 90 kHz clock rate (PTS)
onReceiveVideoPacket: (stream, nalUnitGlob, pts, dts) ->
nalUnits = h264.splitIntoNALUnits nalUnitGlob
if nalUnits.length is 0
return
@onReceiveVideoNALUnits stream, nalUnits, pts, dts
return
# pts, dts: in 90KHz clock rate
onReceiveAudioAccessUnits: (stream, accessUnits, pts, dts) ->
@rtspServer.sendAudioData stream, accessUnits, pts, dts
if DEBUG_INCOMING_PACKET_DATA
logger.info "receive audio: num_access_units=#{accessUnits.length} pts=#{pts}"
ptsPerFrame = 90000 / (stream.audioSampleRate / 1024)
for accessUnit, i in accessUnits
if DEBUG_INCOMING_PACKET_HASH
md5 = crypto.createHash 'md5'
md5.update accessUnit
logger.info "audio: pts=#{pts} md5=#{md5.digest('hex')[0..6]} bytes=#{accessUnit.length}"
@rtmpServer.sendAudioPacket stream, accessUnit,
Math.round(pts + ptsPerFrame * i),
Math.round(dts + ptsPerFrame * i)
# pts, dts: in 90KHz clock rate
onReceiveAudioPacket: (stream, adtsFrameGlob, pts, dts) ->
adtsFrames = aac.splitIntoADTSFrames adtsFrameGlob
if adtsFrames.length is 0
return
adtsInfo = aac.parseADTSFrame adtsFrames[0]
isConfigUpdated = false
stream.updateConfig
audioSampleRate: adtsInfo.sampleRate
audioClockRate: adtsInfo.sampleRate
audioChannels: adtsInfo.channels
audioObjectType: adtsInfo.audioObjectType
rtpTimePerFrame = 1024
rawDataBlocks = []
for adtsFrame, i in adtsFrames
rawDataBlock = adtsFrame[7..]
rawDataBlocks.push rawDataBlock
@onReceiveAudioAccessUnits stream, rawDataBlocks, pts, dts
module.exports = StreamServer