Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial effort to implement SSE #3

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 198 additions & 16 deletions src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,12 @@ function setupRest(app) {
token: token,
iceServers: iceServers,
subscribers: {},
enabled: false
enabled: false,
active: false,
};
whep.info('[' + id + '] Created new WHEP endpoint');
// Monitor the state of the mountpoint on a regular basis
monitorEndpoint(endpoints[id]);
// Done
res.sendStatus(200);
});
Expand All @@ -212,12 +215,16 @@ function setupRest(app) {
return;
}
whep.debug("/endpoint/:", id);
// If we received an SDP, the client is providing an offer
// If we received a payload, make sure it's an SDP
whep.debug(req.body);
if(req.headers["content-type"] === "application/sdp" && req.body.indexOf('v=0') >= 0) {
res.status(403);
res.send('Client offers unsupported');
return;
let offer = null;
if(req.headers["content-type"]) {
if(req.headers["content-type"] !== "application/sdp" || req.body.indexOf('v=0') < 0) {
res.status(406);
res.send('Unsupported content type');
return;
}
offer = req.body;
}
// Check the Bearer token
let auth = req.headers["authorization"];
Expand Down Expand Up @@ -255,20 +262,32 @@ function setupRest(app) {
let subscriber = subscribers[uuid];
if(subscriber) {
whep.info('[' + subscriber.whepId + '][' + uuid + '] PeerConnection detected as closed');
janus.removeSession({ uuid: endpoint.uuid });
janus.removeSession({ uuid: uuid });
delete subscriber.sse;
delete subscribers[uuid];
let endpoint = endpoints[subscriber.whepId];
if(endpoint) {
delete endpoint.subscribers[uuid];
// Notify updated viewers count
let count = Object.keys(endpoint.subscribers).length;
notifyEndpointSubscribers(endpoint, {
type: 'viewercount',
data: JSON.stringify({ viewercount: count })
});
}
}
}
});
// Prepare the subscription request
let details = {
uuid: uuid,
mountpoint: endpoint.mountpoint,
pin: endpoint.pin
pin: endpoint.pin,
sdp: offer
};
subscriber.enabled = true;
janus.subscribe(details, function(err, result) {
// Make sure we got an OFFER back
// Make sure we got an SDP back
if(err) {
delete subscribers[uuid];
res.status(500);
Expand All @@ -278,15 +297,21 @@ function setupRest(app) {
endpoint.subscribers[uuid] = true;
subscriber.resource = config.rest + '/resource/' + uuid;
subscriber.latestEtag = janus.generateRandomString(16);
// Notify updated viewers count
let count = Object.keys(endpoint.subscribers).length;
notifyEndpointSubscribers(endpoint, {
type: 'viewercount',
data: JSON.stringify({ viewercount: count })
});
// Done
res.setHeader('Access-Control-Expose-Headers', 'Location, Link');
res.setHeader('Accept-Patch', 'application/trickle-ice-sdpfrag');
res.setHeader('Location', subscriber.resource);
res.set('ETag', '"' + subscriber.latestEtag + '"');
let iceServers = endpoint.iceServers ? endpoint.iceServers : config.iceServers;
let links = [];
if(iceServers && iceServers.length > 0) {
// Add a Link header for each static ICE server
let links = [];
for(let server of iceServers) {
if(!server.uri || (server.uri.indexOf('stun:') !== 0 &&
server.uri.indexOf('turn:') !== 0 &&
Expand All @@ -301,8 +326,13 @@ function setupRest(app) {
}
links.push(link);
}
res.setHeader('Link', links);
}
// Advertise support for SSE
let link = '<' + config.rest + '/sse/' + uuid + '>; ' +
'rel="urn:ietf:params:whep:ext:core:server-sent-events"; ' +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heads-up: I noticed there's some inconsistency about these rel values in the spec.

Opened wish-wg/webrtc-http-egress-protocol#13 to discuss that and come up with a canonical answer, but this line and its parallel in web/watch.js may need to change.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inconsistency has been resolved. No action is needed in this PR.

Sergio's updated the spec and reference lib to consistently use urn:ietf:params:whep:ext:core:server-sent-events (the value used in this PR).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up! (and sorry for the late feedback, I was busy at the IETF meeting).

'events="active,inactive,layers,viewercount"';
links.push(link);
res.setHeader('Link', links);
res.writeHeader(201, { 'Content-Type': 'application/sdp' });
res.write(result.jsep.sdp);
res.end();
Expand Down Expand Up @@ -353,8 +383,16 @@ function setupRest(app) {
janus.finalize(details, function(err, result) {
if(err) {
let endpoint = endpoints[subscriber.whepId];
if(endpoint)
if(endpoint) {
delete endpoint.subscribers[uuid];
// Notify updated viewers count
let count = Object.keys(endpoint.subscribers).length;
notifyEndpointSubscribers(endpoint, {
type: 'viewercount',
data: JSON.stringify({ viewercount: count })
});
}
delete subscriber.sse;
delete subscribers[uuid];
res.status(500);
res.send(err.error);
Expand Down Expand Up @@ -476,26 +514,119 @@ function setupRest(app) {
if(janus)
janus.removeSession({ uuid: uuid });
delete endpoint.subscribers[uuid];
// Notify updated viewers count
let count = Object.keys(endpoint.subscribers).length;
notifyEndpointSubscribers(endpoint, {
type: 'viewercount',
data: JSON.stringify({ viewercount: count })
});
delete subscriber.sse;
delete subscribers[uuid];
whep.info('[' + uuid + '] Terminating WHEP session');
// Done
res.sendStatus(200);
});

// GET, HEAD, POST and PUT on the resource must return a 405
router.get('/resource/:id', function(req, res) {
router.get('/resource/:uuid', function(req, res) {
res.sendStatus(405);
});
router.head('/resource/:id', function(req, res) {
router.head('/resource/:uuid', function(req, res) {
res.sendStatus(405);
});
router.post('/resource/:id', function(req, res) {
router.post('/resource/:uuid', function(req, res) {
res.sendStatus(405);
});
router.put('/resource/:id', function(req, res) {
router.put('/resource/:uuid', function(req, res) {
res.sendStatus(405);
});

// Create a SSE
router.post('/sse/:uuid', function(req, res) {
let uuid = req.params.uuid;
let subscriber = subscribers[uuid];
if(!uuid || !subscriber) {
res.status(404);
res.send('Invalid resource ID');
return;
}
let endpoint = endpoints[subscriber.whepId];
if(!endpoint) {
res.status(404);
res.send('Invalid WHEP endpoint');
return;
}
// Make sure we received a JSON array
if(req.headers['content-type'] !== 'application/json' || !Array.isArray(req.body)) {
res.status(406);
res.send('Unsupported content type');
return;
}
if(!subscriber.sse) {
subscriber.sse = {};
for(let ev of req.body)
subscriber.sse[ev] = true;
// FIXME
subscriber.events = [];
// Send a viewercount event right away
subscriber.events.push({
type: 'viewercount',
data: JSON.stringify({ viewercount: Object.keys(endpoint.subscribers).length })
});
}
res.setHeader('Location', config.rest + '/sse/' + uuid);
// Done
res.sendStatus(201);
});

// Helper function to wait some time (needed for long poll)
async function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
}).catch(function() {});
};

// Long poll associated with an existing SSE
router.get('/sse/:uuid', async function(req, res) {
let uuid = req.params.uuid;
let subscriber = subscribers[uuid];
if(!uuid || !subscriber || !subscriber.sse) {
res.status(404);
res.send('Invalid subscription');
return;
}
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Connection', 'keep-alive');
res.write('retry: 2000\n\n');
while(subscriber.events) {
if(subscriber.events.length > 0) {
let event = subscriber.events.shift();
if(event.type && subscriber.sse && subscriber.sse[event.type]) {
res.write('event: ' + event.type + '\n');
res.write('data: ' + event.data + '\n\n');
}
} else {
await sleep(200);
}
}
});

// Get rid of an existing SSE
router.delete('/sse/:uuid', async function(req, res) {
let uuid = req.params.uuid;
let subscriber = subscribers[uuid];
if(!uuid || !subscriber || !subscriber.sse) {
res.status(404);
res.send('Invalid subscription');
return;
}
delete subscriber.sse;
delete subscriber.events;
// Done
res.sendStatus(200);
});

// Simple, non-standard, interface to destroy existing endpoints
router.delete('/endpoint/:id', function(req, res) {
let id = req.params.id;
Expand Down Expand Up @@ -543,5 +674,56 @@ function setupRest(app) {
app.use(bodyParser.json());
app.use(bodyParser.text({ type: 'application/sdp' }));
app.use(bodyParser.text({ type: 'application/trickle-ice-sdpfrag' }));
app.use(bodyParser.text({ type: 'application/json' }));
app.use(config.rest, router);
}

// Helper fucntion to monitor endpoints/mountpoints
function monitorEndpoint(endpoint) {
if(!endpoint)
return;
let id = endpoint.id;
setTimeout(function() {
let endpoint = endpoints[id];
if(!endpoint)
return;
if(!janus || !janus.isReady() || janus.getState() !== "connected") {
// Try again later
monitorEndpoint(endpoint);
return;
}
let details = {
whepId: endpoint.id,
mountpoint: endpoint.mountpoint
};
janus.isMountpointActive(details, function(err, res) {
if(err) {
// Try again later
whep.err(err);
monitorEndpoint(endpoint);
return;
}
if(res.active !== endpoint.active) {
// Notify endpoint status
endpoint.active = res.active;
notifyEndpointSubscribers(endpoint, {
type: (endpoint.active ? 'active' : 'inactive'),
data: JSON.stringify({})
});
}
// Done, schedule a new check for later
monitorEndpoint(endpoint);
});
}, 2000);
}

// Helper function to notify events to all subscribers of an endpoint
function notifyEndpointSubscribers(endpoint, event) {
if(!endpoint || !event)
return;
for(let uuid in endpoint.subscribers) {
let s = subscribers[uuid];
if(s && s.sse && s.events)
s.events.push(event);
}
}
Loading