Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: audio video stream missing data detector #30

Merged
merged 11 commits into from
Jan 23, 2025
2 changes: 2 additions & 0 deletions src/WebRTCIssueDetector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
} from './detectors';
import { CompositeRTCStatsParser, RTCStatsParser } from './parser';
import createLogger from './utils/logger';
import MissingStreamDataDetector from './detectors/MissingStreamDataDetector';

class WebRTCIssueDetector {
readonly eventEmitter: WebRTCIssueEmitter;
Expand Down Expand Up @@ -67,6 +68,7 @@ class WebRTCIssueDetector {
new AvailableOutgoingBitrateIssueDetector(),
new UnknownVideoDecoderImplementationDetector(),
new FrozenVideoTrackDetector(),
new MissingStreamDataDetector(),
evgmel marked this conversation as resolved.
Show resolved Hide resolved
];

this.networkScoresCalculator = params.networkScoresCalculator ?? new DefaultNetworkScoresCalculator();
Expand Down
113 changes: 113 additions & 0 deletions src/detectors/MissingStreamDataDetector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import {
CommonParsedInboundStreamStats,
IssueDetectorResult,
IssuePayload,
IssueReason,
IssueType,
WebRTCStatsParsed,
} from '../types';
import BaseIssueDetector from './BaseIssueDetector';

interface MissingStreamDetectorParams {
timeoutMs?: number;
}

export default class MissingStreamDataDetector extends BaseIssueDetector {
readonly #lastMarkedAt = new Map<string, number>();
evgmel marked this conversation as resolved.
Show resolved Hide resolved

readonly #timeoutMs: number;

constructor(params: MissingStreamDetectorParams = {}) {
super();
this.#timeoutMs = params.timeoutMs ?? 5_000;
}

performDetection(data: WebRTCStatsParsed): IssueDetectorResult {
const { connection: { id: connectionId } } = data;
const issues = this.processData(data);
this.setLastProcessedStats(connectionId, data);
return issues;
}

private processData(data: WebRTCStatsParsed): IssueDetectorResult {
const issues: IssueDetectorResult = [];

const { video: { inbound: newVideoInbound } } = data;
const { audio: { inbound: newAudioInbound } } = data;

issues.push(...this.detectMissingData(
newAudioInbound as unknown as CommonParsedInboundStreamStats[],
IssueType.Stream,
IssueReason.MissingAudioStreamData,
));
issues.push(...this.detectMissingData(
newVideoInbound,
IssueType.Stream,
IssueReason.MissingVideoStreamData,
));

const unvisitedTrackIds = new Set(this.#lastMarkedAt.keys());
evgmel marked this conversation as resolved.
Show resolved Hide resolved

unvisitedTrackIds.forEach((trackId) => {
const lastMarkedAt = this.#lastMarkedAt.get(trackId);
if (lastMarkedAt && Date.now() - lastMarkedAt > this.#timeoutMs) {
this.removeMarkIssue(trackId);
}
});

return issues;
}

private detectMissingData(
commonStreamStats: CommonParsedInboundStreamStats[],
type: IssueType,
reason: IssueReason,
): IssueDetectorResult {
const issues: IssuePayload[] = [];

commonStreamStats.forEach((inboundItem) => {
const trackId = inboundItem.track.trackIdentifier;

if (inboundItem.bytesReceived === 0 && !inboundItem.track.detached && !inboundItem.track.ended) {
const hasIssue = this.markIssue(trackId);
evgmel marked this conversation as resolved.
Show resolved Hide resolved

if (!hasIssue) {
return;
}

const statsSample = {
bytesReceived: inboundItem.bytesReceived,
trackDetached: inboundItem.track.detached,
eugeny-dementev marked this conversation as resolved.
Show resolved Hide resolved
trackEnded: inboundItem.track.ended,
};

issues.push({
type,
reason,
statsSample,
trackIdentifier: trackId,
});
} else {
this.removeMarkIssue(trackId);
}
});

return issues;
}

private markIssue(trackId: string): boolean {
const now = Date.now();
const lastMarkedAt = this.#lastMarkedAt.get(trackId);

if (!lastMarkedAt || now - lastMarkedAt > this.#timeoutMs) {
this.#lastMarkedAt.set(trackId, now);
return true;
}

return false;
}

private removeMarkIssue(trackId: string): void {
evgmel marked this conversation as resolved.
Show resolved Hide resolved
this.#lastMarkedAt.delete(trackId);
}
}
14 changes: 14 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ export enum IssueReason {
LowInboundMOS = 'low-inbound-mean-opinion-score',
LowOutboundMOS = 'low-outbound-mean-opinion-score',
FrozenVideoTrack = 'frozen-video-track',
MissingVideoStreamData = 'missing-video-stream-data',
MissingAudioStreamData = 'missing-audio-stream-data',
}

export type IssuePayload = {
Expand Down Expand Up @@ -433,3 +435,15 @@ export interface Logger {
warn: (msg: any, ...meta: any[]) => void;
error: (msg: any, ...meta: any[]) => void;
}

type CommonKeys<T, U> = Extract<keyof T, keyof U>;

type CommonFields<T, U> = {
[K in CommonKeys<T, U>]: T[K] extends object
? U[K] extends object
? CommonFields<T[K], U[K]> // Recursively check nested objects
: never
: T[K];
};

export type CommonParsedInboundStreamStats = CommonFields<ParsedInboundVideoStreamStats, ParsedInboundAudioStreamStats>;
Loading