Skip to content

Commit

Permalink
feat: audio video stream missing data detector (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeny-dementev authored Jan 23, 2025
1 parent 40c47fd commit a9985a1
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 0 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,23 @@ const exampleIssue = {
}
```


### MissingStreamDataDetector
Detects issues with missing data in active inbound streams
```ts
const exampleIssue = {
type: 'stream',
reason: 'missing-video-stream-data' | 'missing-audio-stream-data',
trackIdentifier: 'some-track-id',
statsSample: {
bytesReceivedDelta: 0, // always zero if issue detected
bytesReceived: 2392384,
trackDetached: false,
trackEnded: false,
},
}
```

## Roadmap

- [ ] Adaptive getStats() call interval based on last getStats() execution time
Expand Down
2 changes: 2 additions & 0 deletions src/WebRTCIssueDetector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
} from './detectors';
import { CompositeRTCStatsParser, RTCStatsParser } from './parser';
import createLogger from './utils/logger';
import MissingStreamDataDetector from './detectors/MissingStreamDataDetector';

class WebRTCIssueDetector {
readonly eventEmitter: WebRTCIssueEmitter;
Expand Down Expand Up @@ -67,6 +68,7 @@ class WebRTCIssueDetector {
new AvailableOutgoingBitrateIssueDetector(),
new UnknownVideoDecoderImplementationDetector(),
new FrozenVideoTrackDetector(),
new MissingStreamDataDetector(),
];

this.networkScoresCalculator = params.networkScoresCalculator ?? new DefaultNetworkScoresCalculator();
Expand Down
165 changes: 165 additions & 0 deletions src/detectors/MissingStreamDataDetector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import {
CommonParsedInboundStreamStats,
IssueDetectorResult,
IssuePayload,
IssueReason,
IssueType,
WebRTCStatsParsed,
} from '../types';
import BaseIssueDetector from './BaseIssueDetector';

interface MissingStreamDetectorParams {
timeoutMs?: number; // delay to report the issue no more often then once per specified timeout
steps?: number; // number of last stats to check
}

export default class MissingStreamDataDetector extends BaseIssueDetector {
readonly #lastMarkedAt = new Map<string, number>();

readonly #timeoutMs: number;

readonly #steps: number;

constructor(params: MissingStreamDetectorParams = {}) {
super();
this.#timeoutMs = params.timeoutMs ?? 15_000;
this.#steps = params.steps ?? 3;
}

performDetection(data: WebRTCStatsParsed): IssueDetectorResult {
const { connection: { id: connectionId } } = data;
const issues = this.processData(data);
this.setLastProcessedStats(connectionId, data);
return issues;
}

private processData(data: WebRTCStatsParsed): IssueDetectorResult {
const issues: IssueDetectorResult = [];

const allLastProcessedStats = [...this.getAllLastProcessedStats(data.connection.id), data];
if (allLastProcessedStats.length < this.#steps) {
return issues;
}

const lastNProcessedStats = allLastProcessedStats.slice(-this.#steps);

const lastNVideoInbound = lastNProcessedStats.map((stats) => stats.video.inbound);
const lastNAudioInbound = lastNProcessedStats.map((stats) => stats.audio.inbound);

issues.push(...this.detectMissingData(
lastNAudioInbound as unknown as CommonParsedInboundStreamStats[][],
IssueType.Stream,
IssueReason.MissingAudioStreamData,
));

issues.push(...this.detectMissingData(
lastNVideoInbound,
IssueType.Stream,
IssueReason.MissingVideoStreamData,
));

const unvisitedTrackIds = new Set(this.#lastMarkedAt.keys());

unvisitedTrackIds.forEach((trackId) => {
const lastMarkedAt = this.#lastMarkedAt.get(trackId);
if (lastMarkedAt && Date.now() - lastMarkedAt > this.#timeoutMs) {
this.removeMarkedIssue(trackId);
}
});

return issues;
}

private detectMissingData(
lastNInboundStats: CommonParsedInboundStreamStats[][],
type: IssueType,
reason: IssueReason,
): IssueDetectorResult {
const issues: IssuePayload[] = [];

const currentInboundStats = lastNInboundStats.pop()!;
const prevInboundItemsByTrackId = MissingStreamDataDetector.mapStatsByTrackId(lastNInboundStats);

currentInboundStats.forEach((inboundItem) => {
const trackId = inboundItem.track.trackIdentifier;

const prevInboundItems = prevInboundItemsByTrackId.get(trackId);

if (!Array.isArray(prevInboundItems) || prevInboundItems.length === 0) {
return;
}

if (inboundItem.track.detached || inboundItem.track.ended) {
return;
}

if (!MissingStreamDataDetector.isAllBytesReceivedDidntChange(inboundItem.bytesReceived, prevInboundItems)) {
this.removeMarkedIssue(trackId);
return;
}

const issueMarked = this.markIssue(trackId);

if (!issueMarked) {
return;
}

const statsSample = {
bytesReceived: inboundItem.bytesReceived,
};

issues.push({
type,
reason,
statsSample,
trackIdentifier: trackId,
});
});

return issues;
}

private static mapStatsByTrackId(
items: CommonParsedInboundStreamStats[][],
): Map<string, CommonParsedInboundStreamStats[]> {
const statsById = new Map<string, CommonParsedInboundStreamStats[]>();
items.forEach((inboundItems) => {
inboundItems.forEach((inbountItem) => {
const accumulatedItems = statsById.get(inbountItem.track.trackIdentifier) || [];
accumulatedItems.push(inbountItem);
statsById.set(inbountItem.track.trackIdentifier, accumulatedItems);
});
});

return statsById;
}

private static isAllBytesReceivedDidntChange(
bytesReceived: number, inboundItems: CommonParsedInboundStreamStats[],
): boolean {
for (let i = 0; i < inboundItems.length; i += 1) {
const inboundItem = inboundItems[i];
if (inboundItem.bytesReceived !== bytesReceived) {
return false;
}
}

return true;
}

private markIssue(trackId: string): boolean {
const now = Date.now();
const lastMarkedAt = this.#lastMarkedAt.get(trackId);

if (!lastMarkedAt || now - lastMarkedAt > this.#timeoutMs) {
this.#lastMarkedAt.set(trackId, now);
return true;
}

return false;
}

private removeMarkedIssue(trackId: string): void {
this.#lastMarkedAt.delete(trackId);
}
}
14 changes: 14 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ export enum IssueReason {
LowInboundMOS = 'low-inbound-mean-opinion-score',
LowOutboundMOS = 'low-outbound-mean-opinion-score',
FrozenVideoTrack = 'frozen-video-track',
MissingVideoStreamData = 'missing-video-stream-data',
MissingAudioStreamData = 'missing-audio-stream-data',
}

export type IssuePayload = {
Expand Down Expand Up @@ -433,3 +435,15 @@ export interface Logger {
warn: (msg: any, ...meta: any[]) => void;
error: (msg: any, ...meta: any[]) => void;
}

type CommonKeys<T, U> = Extract<keyof T, keyof U>;

type CommonFields<T, U> = {
[K in CommonKeys<T, U>]: T[K] extends object
? U[K] extends object
? CommonFields<T[K], U[K]> // Recursively check nested objects
: never
: T[K];
};

export type CommonParsedInboundStreamStats = CommonFields<ParsedInboundVideoStreamStats, ParsedInboundAudioStreamStats>;

0 comments on commit a9985a1

Please sign in to comment.