This repository has been archived by the owner on Nov 4, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreport-demuxer.js
141 lines (135 loc) · 4.27 KB
/
report-demuxer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
const { decode } = require("base64-arraybuffer");
class ReportDemuxer {
constructor() {
}
process(message) {
const report = message.payload;
if (!report) {
return null;
}
const type = report.type;
if (!type) {
return null;
}
const payload = report.payload;
if (!payload) {
return null;
}
let demuxedReport;
try {
const buffer = decode(payload);
const decodedString = String.fromCharCode.apply(null, new Uint8Array(buffer));
demuxedReport = {
payload: JSON.parse(decodedString),
};
} catch (err) {
console.warn(err);
return null;
}
if (!demuxedReport) {
return null;
}
const output = [
null, // OBSERVER_EVENT
null, // CALL_EVENT
null, // CALL_META_DATA
null, // CLIENT_EXTENSION_DATA
null, // PEER_CONNECTION_TRANSPORT
null, // PEER_CONNECTION_DATA_CHANNEL
null, // INBOUND_AUDIO_TRACK
null, // INBOUND_VIDEO_TRACK
null, // OUTBOUND_AUDIO_TRACK
null, // OUTBOUND_VIDEO_TRACK
null, // SFU_EVENT
null, // SFU_META_DATA
null, // SFU_TRANSPORT
null, // SFU_RTP_SOURCE_STREAM
null, // SFU_RTP_SINK_STREAM
null, // SFU_SCTP_STREAM
null, // default
];
switch (type) {
case "OBSERVER_EVENT":
output[0] = demuxedReport;
break;
case "CALL_EVENT":
output[1] = demuxedReport;
break;
case "CALL_META_DATA":
output[2] = demuxedReport;
break;
case "CLIENT_EXTENSION_DATA":
output[3] = demuxedReport;
break;
case "PEER_CONNECTION_TRANSPORT":
output[4] = demuxedReport;
break;
case "PEER_CONNECTION_DATA_CHANNEL":
output[5] = demuxedReport;
break;
case "INBOUND_AUDIO_TRACK":
output[6] = demuxedReport;
break;
case "INBOUND_VIDEO_TRACK":
output[7] = demuxedReport;
break;
case "OUTBOUND_AUDIO_TRACK":
output[8] = demuxedReport;
break;
case "OUTBOUND_VIDEO_TRACK":
output[9] = demuxedReport;
break;
case "SFU_EVENT":
output[10] = demuxedReport;
break;
case "SFU_META_DATA":
output[11] = demuxedReport;
break;
case "SFU_TRANSPORT":
output[12] = demuxedReport;
break;
case "SFU_RTP_SOURCE_STREAM":
output[13] = demuxedReport;
break;
case "SFU_RTP_SINK_STREAM":
output[14] = demuxedReport;
break;
case "SFU_SCTP_STREAM":
output[15] = demuxedReport;
break;
default:
output[16] = demuxedReport;
break;
}
return output;
}
}
const makeTransformer = reportSource => {
if (reportSource === "kafka") {
return msg => {
const payload = msg.payload ? msg.payload.value : null;
return {
payload: payload ? JSON.parse(payload) : null,
};
}
}
console.warn("Unknown report source", reportSource);
return msg => msg;
}
module.exports = function(RED) {
function ReportDemuxerBuilder(config) {
RED.nodes.createNode(this, config);
const node = this;
const demuxer = new ReportDemuxer();
const transform = makeTransformer(config.reportSource || "kafka");
// const flowContext = node.context().flow;
node.on('input', (msg) => {
const message = transform(msg);
const result = demuxer.process(message);
if (result) {
node.send(result);
}
});
}
RED.nodes.registerType("report-demuxer", ReportDemuxerBuilder);
}