diff --git a/src/server.js b/src/server.js index c433e33..9935dfc 100644 --- a/src/server.js +++ b/src/server.js @@ -190,9 +190,12 @@ function setupRest(app) { token: token, iceServers: iceServers, subscribers: {}, - enabled: false + enabled: false, + active: false, }; whep.info('[' + id + '] Created new WHEP endpoint'); + // Monitor the state of the mountpoint on a regular basis + monitorEndpoint(endpoints[id]); // Done res.sendStatus(200); }); @@ -212,12 +215,16 @@ function setupRest(app) { return; } whep.debug("/endpoint/:", id); - // If we received an SDP, the client is providing an offer + // If we received a payload, make sure it's an SDP whep.debug(req.body); - if(req.headers["content-type"] === "application/sdp" && req.body.indexOf('v=0') >= 0) { - res.status(403); - res.send('Client offers unsupported'); - return; + let offer = null; + if(req.headers["content-type"]) { + if(req.headers["content-type"] !== "application/sdp" || req.body.indexOf('v=0') < 0) { + res.status(406); + res.send('Unsupported content type'); + return; + } + offer = req.body; } // Check the Bearer token let auth = req.headers["authorization"]; @@ -255,8 +262,19 @@ function setupRest(app) { let subscriber = subscribers[uuid]; if(subscriber) { whep.info('[' + subscriber.whepId + '][' + uuid + '] PeerConnection detected as closed'); - janus.removeSession({ uuid: endpoint.uuid }); + janus.removeSession({ uuid: uuid }); + delete subscriber.sse; delete subscribers[uuid]; + let endpoint = endpoints[subscriber.whepId]; + if(endpoint) { + delete endpoint.subscribers[uuid]; + // Notify updated viewers count + let count = Object.keys(endpoint.subscribers).length; + notifyEndpointSubscribers(endpoint, { + type: 'viewercount', + data: JSON.stringify({ viewercount: count }) + }); + } } } }); @@ -264,11 +282,12 @@ function setupRest(app) { let details = { uuid: uuid, mountpoint: endpoint.mountpoint, - pin: endpoint.pin + pin: endpoint.pin, + sdp: offer }; subscriber.enabled = true; janus.subscribe(details, function(err, result) { - // Make sure we got an OFFER back + // Make sure we got an SDP back if(err) { delete subscribers[uuid]; res.status(500); @@ -278,15 +297,21 @@ function setupRest(app) { endpoint.subscribers[uuid] = true; subscriber.resource = config.rest + '/resource/' + uuid; subscriber.latestEtag = janus.generateRandomString(16); + // Notify updated viewers count + let count = Object.keys(endpoint.subscribers).length; + notifyEndpointSubscribers(endpoint, { + type: 'viewercount', + data: JSON.stringify({ viewercount: count }) + }); // Done res.setHeader('Access-Control-Expose-Headers', 'Location, Link'); res.setHeader('Accept-Patch', 'application/trickle-ice-sdpfrag'); res.setHeader('Location', subscriber.resource); res.set('ETag', '"' + subscriber.latestEtag + '"'); let iceServers = endpoint.iceServers ? endpoint.iceServers : config.iceServers; + let links = []; if(iceServers && iceServers.length > 0) { // Add a Link header for each static ICE server - let links = []; for(let server of iceServers) { if(!server.uri || (server.uri.indexOf('stun:') !== 0 && server.uri.indexOf('turn:') !== 0 && @@ -301,8 +326,13 @@ function setupRest(app) { } links.push(link); } - res.setHeader('Link', links); } + // Advertise support for SSE + let link = '<' + config.rest + '/sse/' + uuid + '>; ' + + 'rel="urn:ietf:params:whep:ext:core:server-sent-events"; ' + + 'events="active,inactive,layers,viewercount"'; + links.push(link); + res.setHeader('Link', links); res.writeHeader(201, { 'Content-Type': 'application/sdp' }); res.write(result.jsep.sdp); res.end(); @@ -353,8 +383,16 @@ function setupRest(app) { janus.finalize(details, function(err, result) { if(err) { let endpoint = endpoints[subscriber.whepId]; - if(endpoint) + if(endpoint) { delete endpoint.subscribers[uuid]; + // Notify updated viewers count + let count = Object.keys(endpoint.subscribers).length; + notifyEndpointSubscribers(endpoint, { + type: 'viewercount', + data: JSON.stringify({ viewercount: count }) + }); + } + delete subscriber.sse; delete subscribers[uuid]; res.status(500); res.send(err.error); @@ -476,6 +514,13 @@ function setupRest(app) { if(janus) janus.removeSession({ uuid: uuid }); delete endpoint.subscribers[uuid]; + // Notify updated viewers count + let count = Object.keys(endpoint.subscribers).length; + notifyEndpointSubscribers(endpoint, { + type: 'viewercount', + data: JSON.stringify({ viewercount: count }) + }); + delete subscriber.sse; delete subscribers[uuid]; whep.info('[' + uuid + '] Terminating WHEP session'); // Done @@ -483,19 +528,105 @@ function setupRest(app) { }); // GET, HEAD, POST and PUT on the resource must return a 405 - router.get('/resource/:id', function(req, res) { + router.get('/resource/:uuid', function(req, res) { res.sendStatus(405); }); - router.head('/resource/:id', function(req, res) { + router.head('/resource/:uuid', function(req, res) { res.sendStatus(405); }); - router.post('/resource/:id', function(req, res) { + router.post('/resource/:uuid', function(req, res) { res.sendStatus(405); }); - router.put('/resource/:id', function(req, res) { + router.put('/resource/:uuid', function(req, res) { res.sendStatus(405); }); + // Create a SSE + router.post('/sse/:uuid', function(req, res) { + let uuid = req.params.uuid; + let subscriber = subscribers[uuid]; + if(!uuid || !subscriber) { + res.status(404); + res.send('Invalid resource ID'); + return; + } + let endpoint = endpoints[subscriber.whepId]; + if(!endpoint) { + res.status(404); + res.send('Invalid WHEP endpoint'); + return; + } + // Make sure we received a JSON array + if(req.headers['content-type'] !== 'application/json' || !Array.isArray(req.body)) { + res.status(406); + res.send('Unsupported content type'); + return; + } + if(!subscriber.sse) { + subscriber.sse = {}; + for(let ev of req.body) + subscriber.sse[ev] = true; + // FIXME + subscriber.events = []; + // Send a viewercount event right away + subscriber.events.push({ + type: 'viewercount', + data: JSON.stringify({ viewercount: Object.keys(endpoint.subscribers).length }) + }); + } + res.setHeader('Location', config.rest + '/sse/' + uuid); + // Done + res.sendStatus(201); + }); + + // Helper function to wait some time (needed for long poll) + async function sleep(ms) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }).catch(function() {}); + }; + + // Long poll associated with an existing SSE + router.get('/sse/:uuid', async function(req, res) { + let uuid = req.params.uuid; + let subscriber = subscribers[uuid]; + if(!uuid || !subscriber || !subscriber.sse) { + res.status(404); + res.send('Invalid subscription'); + return; + } + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Connection', 'keep-alive'); + res.write('retry: 2000\n\n'); + while(subscriber.events) { + if(subscriber.events.length > 0) { + let event = subscriber.events.shift(); + if(event.type && subscriber.sse && subscriber.sse[event.type]) { + res.write('event: ' + event.type + '\n'); + res.write('data: ' + event.data + '\n\n'); + } + } else { + await sleep(200); + } + } + }); + + // Get rid of an existing SSE + router.delete('/sse/:uuid', async function(req, res) { + let uuid = req.params.uuid; + let subscriber = subscribers[uuid]; + if(!uuid || !subscriber || !subscriber.sse) { + res.status(404); + res.send('Invalid subscription'); + return; + } + delete subscriber.sse; + delete subscriber.events; + // Done + res.sendStatus(200); + }); + // Simple, non-standard, interface to destroy existing endpoints router.delete('/endpoint/:id', function(req, res) { let id = req.params.id; @@ -543,5 +674,56 @@ function setupRest(app) { app.use(bodyParser.json()); app.use(bodyParser.text({ type: 'application/sdp' })); app.use(bodyParser.text({ type: 'application/trickle-ice-sdpfrag' })); + app.use(bodyParser.text({ type: 'application/json' })); app.use(config.rest, router); } + +// Helper fucntion to monitor endpoints/mountpoints +function monitorEndpoint(endpoint) { + if(!endpoint) + return; + let id = endpoint.id; + setTimeout(function() { + let endpoint = endpoints[id]; + if(!endpoint) + return; + if(!janus || !janus.isReady() || janus.getState() !== "connected") { + // Try again later + monitorEndpoint(endpoint); + return; + } + let details = { + whepId: endpoint.id, + mountpoint: endpoint.mountpoint + }; + janus.isMountpointActive(details, function(err, res) { + if(err) { + // Try again later + whep.err(err); + monitorEndpoint(endpoint); + return; + } + if(res.active !== endpoint.active) { + // Notify endpoint status + endpoint.active = res.active; + notifyEndpointSubscribers(endpoint, { + type: (endpoint.active ? 'active' : 'inactive'), + data: JSON.stringify({}) + }); + } + // Done, schedule a new check for later + monitorEndpoint(endpoint); + }); + }, 2000); +} + +// Helper function to notify events to all subscribers of an endpoint +function notifyEndpointSubscribers(endpoint, event) { + if(!endpoint || !event) + return; + for(let uuid in endpoint.subscribers) { + let s = subscribers[uuid]; + if(s && s.sse && s.events) + s.events.push(event); + } +} diff --git a/src/whep-janus.js b/src/whep-janus.js index 3f11253..27c6047 100644 --- a/src/whep-janus.js +++ b/src/whep-janus.js @@ -170,9 +170,22 @@ var whepJanus = function(janusConfig) { that.config.janus.multistream = (response.version >= 1000); whep.info("Janus instance version: " + response.version_string + " (" + (that.config.janus.multistream ? "multistream" : "legacy") + ")"); - // We're done - that.config.janus.state = "connected"; - callback(); + // Finally, create a manager handle we'll use for monitoring + let attach = { + janus: "attach", + session_id: that.config.janus.session.id, + plugin: "janus.plugin.streaming" + }; + janusSend(attach, function(response) { + whep.debug("Attach response:", response); + // Unsubscribe from the transaction + delete that.config.janus.transactions[response["transaction"]]; + // Take note of the handle ID + that.config.janus.manager = response["data"]["id"]; + // We're done + that.config.janus.state = "connected"; + callback(); + }); }); }); }); @@ -195,6 +208,58 @@ var whepJanus = function(janusConfig) { delete sessions[uuid]; }; + // Public method to retrieve info on a specific mountpoint: we use this + // to let WHEP endpoints monitor when a mountpoint becomes active/inactive + this.isMountpointActive = function(details, callback) { + callback = (typeof callback === "function") ? callback : noop; + if(!details.mountpoint || !details.whepId) { + callback({ error: "Missing mandatory attribute(s)" }); + return; + } + let mountpoint = details.mountpoint; + let whepId = details.whepId; + // Send a different request according to the medium we're setting up + let info = { + janus: "message", + session_id: that.config.janus.session.id, + handle_id: that.config.janus.manager, + body: { + request: "info", + id: mountpoint + } + }; + janusSend(info, function(response) { + let event = response["janus"]; + // Get the plugin data: is this a success or an error? + let data = response.plugindata.data; + if(data.error) { + // Unsubscribe from the call transaction + delete that.config.janus.transactions[response["transaction"]]; + whep.err("Got an error querying mountpoint:", data.error); + callback({ error: data.error }); + return; + } + let active = false; + let info = data.info; + if(that.config.janus.multistream) { + // Janus 1.x response, iterate on the media array + for(let m of info.media) { + if(m.age_ms && m.age_ms < 1000) { + active = true; + break; + } + } + } else { + // Janus 0.x response + if((info.audio_age_ms && info.audio_age_ms < 1000) || + (info.video_age_ms && info.video_age_ms < 1000)) + active = true; + } + // Done + callback(null, { whepId: whepId, active: active }); + }); + }; + // Public method for subscribing to a Streaming plugin mountpoint this.subscribe = function(details, callback) { callback = (typeof callback === "function") ? callback : noop; @@ -205,6 +270,7 @@ var whepJanus = function(janusConfig) { } let mountpoint = details.mountpoint; let pin = details.pin; + let sdp = details.sdp; let uuid = details.uuid; let session = sessions[uuid]; if(!session) { @@ -263,6 +329,10 @@ var whepJanus = function(janusConfig) { pin: pin } }; + if(sdp) { + // We're going to let the user provide the SDP offer + subscribe.jsep = { type: 'offer', sdp: sdp }; + } janusSend(subscribe, function(response) { let event = response["janus"]; if(event === "error") { @@ -284,7 +354,7 @@ var whepJanus = function(janusConfig) { callback({ error: data.error }); return; } - whep.debug("Got an offer for session " + uuid + ":", data); + whep.debug("Got an SDP for session " + uuid + ":", data); if(data["reason"]) { // Unsubscribe from the transaction delete that.config.janus.transactions[response["transaction"]]; diff --git a/web/watch.js b/web/watch.js index 0481eb2..e601691 100644 --- a/web/watch.js +++ b/web/watch.js @@ -1,10 +1,10 @@ // Base path for the REST WHEP API var rest = '/whep'; -var resource = null; +var resource = null, token = null; // PeerConnection var pc = null; -var iceUfrag = null, icePwd = null; +var iceUfrag = null, icePwd = null, candidates = []; // Helper function to get query string arguments function getQueryStringValue(name) { @@ -15,6 +15,8 @@ function getQueryStringValue(name) { } // Get the endpoint ID to subscribe to var id = getQueryStringValue('id'); +// Check if we should let the endpoint send the offer +var sendOffer = (getQueryStringValue('offer') === 'true') $(document).ready(function() { // Make sure WebRTC is supported by the browser @@ -30,88 +32,108 @@ $(document).ready(function() { title: 'Insert the endpoint token (leave it empty if not needed)', inputType: 'password', callback: function(result) { - subscribeToEndpoint(result); + token = result; + subscribeToEndpoint(); } }); }); // Function to subscribe to the WHEP endpoint -function subscribeToEndpoint(token) { - let headers = null; +async function subscribeToEndpoint() { + let headers = null, offer = null; if(token) headers = { Authorization: 'Bearer ' + token }; + if(sendOffer) { + // We need to prepare an offer ourselves, do it now + let iceServers = [{urls: "stun:stun.l.google.com:19302"}]; + createPeerConnectionIfNeeded(iceServers); + let transceiver = await pc.addTransceiver('audio'); + if(transceiver.setDirection) + transceiver.setDirection('recvonly'); + else + transceiver.direction = 'recvonly'; + transceiver = await pc.addTransceiver('video'); + if(transceiver.setDirection) + transceiver.setDirection('recvonly'); + else + transceiver.direction = 'recvonly'; + offer = await pc.createOffer({}); + await pc.setLocalDescription(offer); + // Extract ICE ufrag and pwd (for trickle) + iceUfrag = offer.sdp.match(/a=ice-ufrag:(.*)\r\n/)[1]; + icePwd = offer.sdp.match(/a=ice-pwd:(.*)\r\n/)[1]; + } + // Contact the WHEP endpoint $.ajax({ url: rest + '/endpoint/' + id, type: 'POST', headers: headers, - data: {} + contentType: offer ? 'application/sdp' : null, + data: offer ? offer.sdp : {} }).error(function(xhr, textStatus, errorThrown) { bootbox.alert(xhr.status + ": " + xhr.responseText); }).success(function(sdp, textStatus, request) { - console.log('Got offer:', sdp); + console.log('Got SDP:', sdp); resource = request.getResponseHeader('Location'); console.log('WHEP resource:', resource); - // TODO Parse ICE servers - // let ice = request.getResponseHeader('Link'); - let iceServers = [{urls: "stun:stun.l.google.com:19302"}]; - // Create PeerConnection - let pc_config = { - sdpSemantics: 'unified-plan', - iceServers: iceServers - }; - pc = new RTCPeerConnection(pc_config); - pc.oniceconnectionstatechange = function() { - console.log('[ICE] ', pc.iceConnectionState); - }; - pc.onicecandidate = function(event) { - let end = false; - if(!event.candidate || (event.candidate.candidate && event.candidate.candidate.indexOf('endOfCandidates') > 0)) { - console.log('End of candidates'); - end = true; - } else { - console.log('Got candidate:', event.candidate.candidate); - } - if(!resource) { - console.warn('No resource URL, ignoring candidate'); - return; - } - if(!iceUfrag || !icePwd) { - console.warn('No ICE credentials, ignoring candidate'); - return; + // FIXME Parse Link headers (for ICE servers and/or SSE) + let iceServers = []; + let links = request.getResponseHeader('Link'); + let l = links.split('<'); + for(let i of l) { + if(!i || i.length === 0) + continue; + if(i.indexOf('ice-server') !== -1) { + // TODO Parse TURN attributes + let url = i.split('>')[0]; + iceServers.push({ urls: url }); + } else if(i.indexOf('urn:ietf:params:whep:ext:core:server-sent-events') !== -1) { + // TODO Parse event attribute + let url = i.split('>')[0]; + let events = [ 'active', 'inactive', 'layers', 'viewercount' ]; + startSSE(url, events); } - // FIXME Trickle candidate - let candidate = - 'a=ice-ufrag:' + iceUfrag + '\r\n' + - 'a=ice-pwd:' + icePwd + '\r\n' + - 'm=audio 9 RTP/AVP 0\r\n' + - 'a=' + (end ? 'end-of-candidates' : event.candidate.candidate) + '\r\n'; - $.ajax({ - url: resource, - type: 'PATCH', - headers: headers, - contentType: 'application/trickle-ice-sdpfrag', - data: candidate - }).error(function(xhr, textStatus, errorThrown) { - bootbox.alert(xhr.status + ": " + xhr.responseText); - }).done(function(response) { - console.log('Candidate sent'); - }); - }; - pc.ontrack = function(event) { - console.log('Handling Remote Track', event); - if(!event.streams) - return; - if($('#whepvideo').length === 0) { - $('#video').removeClass('hide').show(); - $('#videoremote').append('