diff --git a/examples/server-owned/src/index.js b/examples/server-owned/src/index.js
index ebf3046..76ef7c9 100644
--- a/examples/server-owned/src/index.js
+++ b/examples/server-owned/src/index.js
@@ -43,10 +43,20 @@ import { JanusWhepServer } from '../../../src/whep.js';
// Create a test endpoint using a static token
let endpoint = server.createEndpoint({ id: 'abc123', mountpoint: 1, token: 'verysecret' });
- endpoint.on('new-subscriber', function() {
- console.log(this.id + ': Endpoint has a new subscriber');
+ endpoint.on('new-subscriber', function(uuid) {
+ console.log(this.id + ': Endpoint has a new subscriber (' + uuid + ')');
+ endpoint.notifySubscribers({ event: 'viewercount', data: ''+endpoint.countSubscribers() });
});
- endpoint.on('subscriber-gone', function() {
- console.log(this.id + ': Endpoint subscriber left');
+ endpoint.on('subscriber-sse', function(uuid, events) {
+ if(events) {
+ console.log(this.id + ': Subscriber ' + uuid + ' subscribed to SSE:', events);
+ this.getSubscriber({ uuid: uuid }).notify({ event: 'viewercount', data: ''+endpoint.countSubscribers() });
+ } else {
+ console.log(this.id + ': Subscriber ' + uuid + ' unsubscribed from SSE');
+ }
+ });
+ endpoint.on('subscriber-gone', function(uuid) {
+ console.log(this.id + ': Endpoint subscriber left (' + uuid + ')');
+ endpoint.notifySubscribers({ event: 'viewercount', data: ''+endpoint.countSubscribers() });
});
}());
diff --git a/examples/server-shared/src/index.js b/examples/server-shared/src/index.js
index 1f07bb1..82b2644 100644
--- a/examples/server-shared/src/index.js
+++ b/examples/server-shared/src/index.js
@@ -42,13 +42,23 @@ import { JanusWhepServer } from '../../../src/whep.js';
await server.start();
// Create a test endpoint using a callback function to validate the token
- let endpoint = server.createEndpoint({ id: 'abc123', mountpoint: 1, token: function(authtoken) {
+ let endpoint = server.createEndpoint({ id: 'abc123', sse: ['viewercount'], mountpoint: 1, token: function(authtoken) {
return authtoken === 'verysecret';
}});
- endpoint.on('new-subscriber', function() {
- console.log(this.id + ': Endpoint has a new subscriber');
+ endpoint.on('new-subscriber', function(uuid) {
+ console.log(this.id + ': Endpoint has a new subscriber (' + uuid + ')');
+ endpoint.notifySubscribers({ event: 'viewercount', data: ''+endpoint.countSubscribers() });
});
- endpoint.on('subscriber-gone', function() {
- console.log(this.id + ': Endpoint subscriber left');
+ endpoint.on('subscriber-sse', function(uuid, events) {
+ if(events) {
+ console.log(this.id + ': Subscriber ' + uuid + ' subscribed to SSE:', events);
+ this.getSubscriber({ uuid: uuid }).notify({ event: 'viewercount', data: ''+endpoint.countSubscribers() });
+ } else {
+ console.log(this.id + ': Subscriber ' + uuid + ' unsubscribed from SSE');
+ }
+ });
+ endpoint.on('subscriber-gone', function(uuid) {
+ console.log(this.id + ': Endpoint subscriber left (' + uuid + ')');
+ endpoint.notifySubscribers({ event: 'viewercount', data: ''+endpoint.countSubscribers() });
});
}());
diff --git a/examples/web/index.js b/examples/web/index.js
index 5ce6f62..a940d5a 100644
--- a/examples/web/index.js
+++ b/examples/web/index.js
@@ -77,9 +77,24 @@ async function subscribeToEndpoint() {
console.log('Got SDP:', sdp);
resource = request.getResponseHeader('Location');
console.log('WHEP resource:', resource);
- // TODO Parse ICE servers
- // let ice = request.getResponseHeader('Link');
- let iceServers = [{urls: "stun:stun.l.google.com:19302"}];
+ // FIXME Parse Link headers (for ICE servers and/or SSE)
+ let iceServers = [];
+ let links = request.getResponseHeader('Link');
+ let l = links.split('<');
+ for(let i of l) {
+ if(!i || i.length === 0)
+ continue;
+ if(i.indexOf('ice-server') !== -1) {
+ // TODO Parse TURN attributes
+ let url = i.split('>')[0];
+ iceServers.push({ urls: url });
+ } else if(i.indexOf('urn:ietf:params:whep:ext:core:server-sent-events') !== -1) {
+ // TODO Parse event attribute
+ let url = i.split('>')[0];
+ let events = [ 'active', 'inactive', 'layers', 'viewercount' ];
+ startSSE(url, events);
+ }
+ }
// Create PeerConnection, if needed
createPeerConnectionIfNeeded(iceServers);
// Pass the SDP to the PeerConnection
@@ -222,7 +237,6 @@ function createPeerConnectionIfNeeded(iceServers) {
console.log('Handling Remote Track', event);
if(!event.streams)
return;
- console.warn(event.streams[0].getTracks());
if($('#whepvideo').length === 0) {
$('#video').removeClass('hide').show();
$('#videoremote').append('');
@@ -233,3 +247,41 @@ function createPeerConnectionIfNeeded(iceServers) {
$('#whepvideo').get(0).volume = 1;
};
}
+
+// Helper function to subscribe to events via SSE
+function startSSE(url, events) {
+ console.log('Starting SSE:', url);
+ $.ajax({
+ url: backend + url,
+ type: 'POST',
+ contentType: 'application/json',
+ data: JSON.stringify(events)
+ }).error(function(xhr, textStatus, errorThrown) {
+ bootbox.alert(xhr.status + ": " + xhr.responseText);
+ }).success(function(res, textStatus, request) {
+ // Done, access the Location header
+ let sse = request.getResponseHeader('Location');
+ console.log('SSE Location:', sse);
+ let source = new EventSource(sse);
+ source.addEventListener('active', message => {
+ updateSSE('active', message.data);
+ });
+ source.addEventListener('inactive', message => {
+ updateSSE('inactive', message.data);
+ });
+ source.addEventListener('viewercount', message => {
+ updateSSE('viewercount', message.data);
+ });
+ source.addEventListener('layer', message => {
+ updateSSE('layer', message.data);
+ });
+ });
+}
+
+function updateSSE(event, data) {
+ console.log('SSE: ' + event + ' = ' + data);
+ if($('#sse-' + event).length === 0)
+ $('.panel-title').append('');
+ $('#sse-' + event).text(event + ': ' + data);
+
+}
diff --git a/package.json b/package.json
index febfdae..0920301 100644
--- a/package.json
+++ b/package.json
@@ -1,7 +1,7 @@
{
"name": "janus-whep-server",
"description": "Simple Janus-based WHEP server library",
- "version": "1.0.0",
+ "version": "1.1.0",
"type": "module",
"keywords": [
"whep",
diff --git a/src/whep.js b/src/whep.js
index dc19a97..1185289 100644
--- a/src/whep.js
+++ b/src/whep.js
@@ -113,13 +113,25 @@ class JanusWhepServer extends EventEmitter {
return randomString;
}
- createEndpoint({ id, mountpoint, pin, label, token, iceServers }) {
- if(!id || !mountpoint)
+ createEndpoint({ id, sse, mountpoint, pin, label, token, iceServers }) {
+ if(!id || !mountpoint || (sse && !Array.isArray(sse)))
throw new Error('Invalid arguments');
if(this.endpoints.has(id))
throw new Error('Endpoint already exists');
+ let events = null;
+ if(sse) {
+ for(let event of sse) {
+ if(typeof event === 'string') {
+ if(!events)
+ events = new Map();
+ events.set(event, true);
+ }
+ }
+ }
let endpoint = new JanusWhepEndpoint({
+ server: this,
id: id,
+ sse: events,
mountpoint: mountpoint,
pin: pin,
label: label,
@@ -192,9 +204,11 @@ class JanusWhepServer extends EventEmitter {
let subscriber = this.subscribers.get(uuid);
if(!subscriber)
return;
+ delete subscriber.sse;
+ delete subscriber.events;
if(subscriber.handle) {
- endpoint.emit('subscriber-gone');
- this.emit('subscriber-gone', id);
+ endpoint.emit('subscriber-gone', uuid);
+ this.emit('subscriber-gone', id, uuid);
}
}, this);
endpoint.subscribers.clear();
@@ -277,10 +291,7 @@ class JanusWhepServer extends EventEmitter {
return;
}
let uuid = this.generateRandomString(16);
- let subscriber = {
- uuid: uuid,
- whepId: id
- };
+ let subscriber = new JanusWhepSubscriber({ uuid: uuid, whepId: id });
this.subscribers.set(uuid, subscriber);
// Create a new session
this.logger.info('[' + id + '] Subscribing to WHEP endpoint');
@@ -305,11 +316,13 @@ class JanusWhepServer extends EventEmitter {
if(subscriber) {
this.logger.info('[' + id + '][' + uuid + '] PeerConnection closed');
await subscriber.handle.detach().catch(_err => {});
+ delete subscriber.sse;
+ delete subscriber.events;
if(endpoint) {
endpoint.subscribers.delete(uuid);
- endpoint.emit('subscriber-gone');
+ endpoint.emit('subscriber-gone', uuid);
}
- this.emit('subscriber-gone', id);
+ this.emit('subscriber-gone', id, uuid);
this.subscribers.delete(uuid);
}
});
@@ -342,10 +355,11 @@ class JanusWhepServer extends EventEmitter {
res.setHeader('Accept-Patch', 'application/trickle-ice-sdpfrag');
res.setHeader('Location', subscriber.resource);
res.set('ETag', '"' + subscriber.latestEtag + '"');
+ // Check if we have Link headers to add to the response
+ let links = [];
let iceServers = endpoint.iceServers ? endpoint.iceServers : this.config.iceServers;
if(iceServers && iceServers.length > 0) {
// Add a Link header for each static ICE server
- let links = [];
for(let server of iceServers) {
if(!server.uri || (server.uri.indexOf('stun:') !== 0 &&
server.uri.indexOf('turn:') !== 0 &&
@@ -360,13 +374,28 @@ class JanusWhepServer extends EventEmitter {
}
links.push(link);
}
- res.setHeader('Link', links);
}
+ if(endpoint.sse) {
+ // Add a Link header to advertise support for SSE
+ let events = '';
+ endpoint.sse.forEach(function(_val, event) {
+ if(events.length === 0)
+ events += event;
+ else
+ events += ',' + event;
+ });
+ let link = '<' + this.config.rest.basePath + '/sse/' + uuid + '>; ' +
+ 'rel="urn:ietf:params:whep:ext:core:server-sent-events"; ' +
+ 'events="' + events + '"';
+ links.push(link);
+ }
+ if(links.length > 0)
+ res.setHeader('Link', links);
res.writeHeader(201, { 'Content-Type': 'application/sdp' });
res.write(result.jsep.sdp);
res.end();
- endpoint.emit('new-subscriber');
- this.emit('new-subscriber', id);
+ endpoint.emit('new-subscriber', uuid);
+ this.emit('new-subscriber', id, uuid);
} catch(err) {
this.logger.err('Error subscribing:', err);
this.subscribers.delete(uuid);
@@ -563,11 +592,13 @@ class JanusWhepServer extends EventEmitter {
// Get rid of the Janus subscriber
if(this.janus && subscriber.handle)
await subscriber.handle.detach().catch(_err => {});
+ delete subscriber.sse;
+ delete subscriber.events;
endpoint.subscribers.delete(uuid);
this.subscribers.delete(uuid);
this.logger.info('[' + uuid + '] Terminating WHEP session');
- endpoint.emit('subscriber-gone');
- this.emit('subscriber-gone', endpoint.id);
+ endpoint.emit('subscriber-gone', uuid);
+ this.emit('subscriber-gone', endpoint.id, uuid);
// Done
res.sendStatus(200);
});
@@ -586,6 +617,119 @@ class JanusWhepServer extends EventEmitter {
res.sendStatus(405);
});
+ // Create a SSE
+ router.post('/sse/:uuid', (req, res) => {
+ let uuid = req.params.uuid;
+ let subscriber = this.subscribers.get(uuid);
+ if(!uuid || !subscriber) {
+ res.status(404);
+ res.send('Invalid resource ID');
+ return;
+ }
+ let endpoint = this.endpoints.get(subscriber.whepId);
+ if(!endpoint) {
+ res.status(404);
+ res.send('Invalid WHEP endpoint');
+ return;
+ }
+ // Make sure we received a JSON array
+ if(req.headers['content-type'] !== 'application/json' || !Array.isArray(req.body)) {
+ res.status(406);
+ res.send('Unsupported content type');
+ return;
+ }
+ this.logger.verb('[' + endpoint.id + '] Subscriber ' + uuid + ' subscribing to SSE events:', req.body);
+ let events = undefined;
+ for(let ev of req.body) {
+ if(endpoint.sse.has(ev)) {
+ if(!subscriber.sse)
+ subscriber.sse = new Map();
+ if(!subscriber.events)
+ subscriber.events = [];
+ subscriber.sse.set(ev, true);
+ if(!events)
+ events = [];
+ events.push(ev);
+ }
+ }
+ this.logger.verb('[' + endpoint.id + '] Will send subscriber ' + uuid + ' these SSE events:', events);
+ if(!events) {
+ res.status(404);
+ res.send('No supported events provided');
+ return;
+ }
+ res.setHeader('Location', this.config.rest.basePath + '/sse/' + uuid);
+ endpoint.emit('subscriber-sse', uuid, events);
+ this.emit('subscriber-sse', endpoint.id, uuid, events);
+ // Done
+ res.sendStatus(201);
+ });
+
+ // Helper function to wait some time (needed for long poll)
+ async function sleep(ms) {
+ return new Promise((resolve) => {
+ setTimeout(resolve, ms);
+ }).catch(function() {});
+ };
+
+ // Long poll associated with an existing SSE
+ router.get('/sse/:uuid', async (req, res) => {
+ let uuid = req.params.uuid;
+ let subscriber = this.subscribers.get(uuid);
+ if(!uuid || !subscriber || !subscriber.sse) {
+ res.status(404);
+ res.send('Invalid subscription');
+ return;
+ }
+ let endpoint = this.endpoints.get(subscriber.whepId);
+ if(!endpoint) {
+ res.status(404);
+ res.send('Invalid WHEP endpoint');
+ return;
+ }
+ this.logger.verb('[' + endpoint.id + '] Subscriber ' + uuid + ' started SSE longpoll');
+ res.setHeader('Cache-Control', 'no-cache');
+ res.setHeader('Content-Type', 'text/event-stream');
+ res.setHeader('Connection', 'keep-alive');
+ res.write('retry: 2000\n\n');
+ while(subscriber.events) {
+ if(subscriber.events.length > 0) {
+ let ev = subscriber.events.shift();
+ if(ev.event && subscriber.sse && subscriber.sse.has(ev.event)) {
+ res.write('event: ' + ev.event + '\n');
+ res.write('data: ' + ev.data + '\n\n');
+ }
+ } else {
+ await sleep(200);
+ }
+ }
+ this.logger.verb('[' + endpoint.id + '] Subscriber ' + uuid + ' SSE longpoll interrupted');
+ res.end();
+ });
+
+ // Get rid of an existing SSE
+ router.delete('/sse/:uuid', (req, res) => {
+ let uuid = req.params.uuid;
+ let subscriber = this.subscribers.get(uuid);
+ if(!uuid || !subscriber || !subscriber.sse) {
+ res.status(404);
+ res.send('Invalid subscription');
+ return;
+ }
+ let endpoint = this.endpoints.get(subscriber.whepId);
+ if(!endpoint) {
+ res.status(404);
+ res.send('Invalid WHEP endpoint');
+ return;
+ }
+ delete subscriber.sse;
+ delete subscriber.events;
+ endpoint.emit('subscriber-sse', uuid);
+ this.emit('subscriber-sse', endpoint.id, uuid);
+ // Done
+ res.sendStatus(200);
+ });
+
// Setup CORS
app.use(cors({ preflightContinue: true }));
@@ -599,9 +743,11 @@ class JanusWhepServer extends EventEmitter {
// WHEP endpoint class
class JanusWhepEndpoint extends EventEmitter {
- constructor({ id, mountpoint, pin, label, token, iceServers }) {
+ constructor({ server, id, sse, mountpoint, pin, label, token, iceServers }) {
super();
+ this.server = server;
this.id = id;
+ this.sse = sse;
this.mountpoint = mountpoint;
this.pin = pin;
this.label = label;
@@ -621,7 +767,42 @@ class JanusWhepEndpoint extends EventEmitter {
list.push({ uuid: uuid });
});
return list;
+ }
+
+ getSubscriber({ uuid }) {
+ return this.server.subscribers.get(uuid);
+ }
+ notifySubscribers({ event, data }) {
+ if(!event || !data)
+ throw new Error('Invalid arguments');
+ if(!this.sse.has(event))
+ throw new Error('Unsupported event \'' + event + '\'');
+ this.subscribers.forEach(function(_val, uuid) {
+ let subscriber = this.server.subscribers.get(uuid);
+ if(!subscriber)
+ return;
+ subscriber.notify({ event, data });
+ }, this);
+ }
+}
+
+// WHEP subscriber class
+class JanusWhepSubscriber extends EventEmitter {
+ constructor({ whepId, uuid }) {
+ super();
+ this.whepId = whepId;
+ this.uuid = uuid;
+ }
+
+ notify({ event, data }) {
+ if(!this.sse || !this.events)
+ return;
+ if(!event || !data)
+ throw new Error('Invalid arguments');
+ if(!this.sse.has(event))
+ throw new Error('Unsupported event \'' + event + '\'');
+ this.events.push({ event, data });
}
}
@@ -676,5 +857,6 @@ class JanusWhepLogger {
// Exports
export {
JanusWhepServer,
- JanusWhepEndpoint
+ JanusWhepEndpoint,
+ JanusWhepSubscriber
};