Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[web-wasm] Handle EOS and inputs with different framerates #878

Merged
merged 4 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 32 additions & 30 deletions ts/@live-compositor/web-wasm/src/input/decoder/h264Decoder.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import { Queue } from '@datastructures-js/queue';
export type FrameWithPts = {
frame: Omit<VideoFrame, 'timestamp'>;
ptsMs: number;
};

const MAX_DECODED_FRAMES = 3;
export type H264DecoderProps = {
onFrame: (frame: FrameWithPts) => void;
};

export class H264Decoder {
private chunks: Queue<EncodedVideoChunk>;
private frames: Queue<VideoFrame>;
private decoder: VideoDecoder;
private ptsOffset?: number;

public constructor() {
this.chunks = new Queue();
this.frames = new Queue();
public constructor(props: H264DecoderProps) {
// TODO(noituri): Use web workers
this.decoder = new VideoDecoder({
output: frame => {
this.frames.push(frame);
},
output: videoFrame => props.onFrame(this.intoFrameWithPts(videoFrame)),
error: error => {
console.error(`MP4Decoder error: ${error}`);
console.error(`H264Decoder error: ${error}`);
},
});
}
Expand All @@ -25,29 +25,31 @@ export class H264Decoder {
this.decoder.configure(config);
}

public enqueueChunk(chunk: EncodedVideoChunk) {
this.chunks.push(chunk);
this.decodeChunks();
public decode(chunk: EncodedVideoChunk) {
this.decoder.decode(chunk);
}

/**
* Returns decoded video frames. Frames have to be manually freed from memory
*/
public getFrame(): VideoFrame | undefined {
this.decodeChunks();
return this.frames.pop();
public decodeQueueSize(): number {
return this.decoder.decodeQueueSize;
}

private decodeChunks() {
while (
this.frames.size() < MAX_DECODED_FRAMES &&
this.decoder.decodeQueueSize < MAX_DECODED_FRAMES
) {
const chunk = this.chunks.pop();
if (!chunk) {
break;
}
this.decoder.decode(chunk);
private intoFrameWithPts(videoFrame: VideoFrame): FrameWithPts {
if (this.ptsOffset === undefined) {
this.ptsOffset = -videoFrame.timestamp;
}

return {
frame: videoFrame,
// TODO(noituri): Handle pts roller
ptsMs: (this.ptsOffset + videoFrame.timestamp) / 1000,
};
}

public async flush(): Promise<void> {
await this.decoder.flush();
}

public close() {
this.decoder.close();
}
}
81 changes: 81 additions & 0 deletions ts/@live-compositor/web-wasm/src/input/frame.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import type { Frame } from '@live-compositor/browser-render';
import { FrameFormat } from '@live-compositor/browser-render';
import type { FrameWithPts } from './decoder/h264Decoder';
import { assert } from '../utils';

/**
* Represents frame produced by decoder.
* Memory has to be manually managed by incrementing reference count on `FrameRef` copy and decrementing it once it's no longer used
* `Input` manages memory in `getFrameRef()`
* `Queue` on tick pulls `FrameRef` for each input and once render finishes, decrements the ref count
*/
export class FrameRef {
private frame: FrameWithPts;
private refCount: number;
private downloadedFrame?: Frame;

public constructor(frame: FrameWithPts) {
this.frame = frame;
this.refCount = 1;
}

/**
* Increments reference count. Should be called every time the reference is copied.
*/
public incrementRefCount(): void {
assert(this.refCount > 0);
this.refCount++;
}

/**
* Decrements reference count. If reference count reaches 0, `FrameWithPts` is freed from the memory.
* It's unsafe to use the returned frame after `decrementRefCount()` call.
* Should be used after we're sure we no longer need the frame.
*/
public decrementRefCount(): void {
assert(this.refCount > 0);

this.refCount--;
if (this.refCount === 0) {
this.frame.frame.close();
}
}

/**
* Returns underlying frame. Fails if frame was freed from memory.
*/
public async getFrame(): Promise<Frame> {
assert(this.refCount > 0);

if (!this.downloadedFrame) {
this.downloadedFrame = await downloadFrame(this.frame);
}
return this.downloadedFrame;
}

public getPtsMs(): number {
return this.frame.ptsMs;
}
}

async function downloadFrame(frameWithPts: FrameWithPts): Promise<Frame> {
// Safari does not support conversion to RGBA
// Chrome does not support conversion to YUV
const isSafari = !!(window as any).safari;
const options = {
format: isSafari ? 'I420' : 'RGBA',
};

const frame = frameWithPts.frame;
const buffer = new Uint8ClampedArray(frame.allocationSize(options as VideoFrameCopyToOptions));
await frame.copyTo(buffer, options as VideoFrameCopyToOptions);

return {
resolution: {
width: frame.displayWidth,
height: frame.displayHeight,
},
format: isSafari ? FrameFormat.YUV_BYTES : FrameFormat.RGBA_BYTES,
data: buffer,
};
}
175 changes: 150 additions & 25 deletions ts/@live-compositor/web-wasm/src/input/input.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,51 @@
import type { Frame, InputId } from '@live-compositor/browser-render';
import type { InputId } from '@live-compositor/browser-render';
import { CompositorEventType } from 'live-compositor';
import type { EventSender } from '../eventSender';
import type InputSource from './source';

/**
* Represents frame produced by decoder.
* `InputFrame` has to be manually freed from the memory by calling `free()` method. Once freed it no longer can be used.
* `Queue` on tick pulls `InputFrame` for each input and once render finishes, manually frees `InputFrame`s.
*/
export type InputFrame = Frame & {
/**
* Frees `InputFrame` from memory. `InputFrame` can not be used after `free()`.
*/
free: () => void;
};
import { Queue } from '@datastructures-js/queue';
import { H264Decoder } from './decoder/h264Decoder';
import { FrameRef } from './frame';
import { assert, framerateToDurationMs } from '../utils';

export type InputState = 'waiting_for_start' | 'buffering' | 'playing' | 'finished';

const MAX_BUFFERING_SIZE = 3;

export class Input {
private id: InputId;
private source: InputSource;
private state: InputState;
private source: InputSource;
private decoder: H264Decoder;
private eventSender: EventSender;
private frames: Queue<FrameRef>;
/**
* Queue PTS of the first frame
*/
private startPtsMs?: number;

public constructor(id: InputId, source: InputSource, eventSender: EventSender) {
this.id = id;
this.state = 'waiting_for_start';
this.source = source;
this.eventSender = eventSender;
this.frames = new Queue();
this.decoder = new H264Decoder({
onFrame: frame => {
this.frames.push(new FrameRef(frame));
},
});

this.source.registerCallbacks({
onDecoderConfig: config => this.decoder.configure(config),
});
}

public start() {
if (this.state !== 'waiting_for_start') {
console.warn(`Tried to start an already started input "${this.id}"`);
return;
}

this.source.start();
this.state = 'buffering';
this.eventSender.sendEvent({
Expand All @@ -43,18 +54,132 @@ export class Input {
});
}

public async getFrame(): Promise<InputFrame | undefined> {
const frame = await this.source.getFrame();
// TODO(noituri): Handle this better
if (frame && this.state === 'buffering') {
this.state = 'playing';
this.eventSender.sendEvent({
type: CompositorEventType.VIDEO_INPUT_PLAYING,
inputId: this.id,
});
public async getFrameRef(currentQueuePts: number): Promise<FrameRef | undefined> {
if (this.state === 'buffering') {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

misleading name, I'm not sure what this function does, but it is definitely a lot more than geting a value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline

this.handleBuffering();
return;
}
if (this.state !== 'playing') {
return;
}
if (this.startPtsMs === undefined) {
this.startPtsMs = currentQueuePts;
}

this.dropOldFrames(currentQueuePts);
this.enqueueChunks(currentQueuePts);

// No more chunks will be produced. Flush all the remaining frames from the decoder
if (this.source.isFinished() && this.decoder.decodeQueueSize() !== 0) {
await this.decoder.flush();
}

let frame: FrameRef | undefined;
if (this.source.isFinished() && this.frames.size() == 1) {
// Last frame is not poped by `dropOldFrames`
frame = this.frames.pop();
} else {
frame = this.getLatestFrame();
}

if (frame) {
return frame;
}

// Source received EOS & there is no more frames
if (this.source.isFinished()) {
this.handleEos();
return;
}

return undefined;
}

/**
* Retrieves latest frame and increments its reference count
*/
private getLatestFrame(): FrameRef | undefined {
const frame = this.frames.front();
if (frame) {
frame.incrementRefCount();
return frame;
}
// TODO(noituri): Handle EOS

return frame;
return undefined;
}

/**
* Finds frame with PTS closest to `currentQueuePts` and removes frames older than it
*/
private dropOldFrames(currentQueuePts: number): void {
if (this.frames.isEmpty()) {
return;
}

const frames = this.frames.toArray();
const targetPts = this.queuePtsToInputPts(currentQueuePts);

const targetFrame = frames.reduce((prevFrame, frame) => {
const prevPtsDiff = Math.abs(prevFrame.getPtsMs() - targetPts);
const currPtsDiff = Math.abs(frame.getPtsMs() - targetPts);
return prevPtsDiff < currPtsDiff ? prevFrame : frame;
});

for (const frame of frames) {
if (frame.getPtsMs() < targetFrame.getPtsMs()) {
frame.decrementRefCount();
this.frames.pop();
}
}
}

private handleBuffering() {
if (this.frames.size() < MAX_BUFFERING_SIZE) {
this.tryEnqueueChunk();
return;
}

this.state = 'playing';
this.eventSender.sendEvent({
type: CompositorEventType.VIDEO_INPUT_PLAYING,
inputId: this.id,
});
}

private handleEos() {
this.state = 'finished';
this.eventSender.sendEvent({
type: CompositorEventType.VIDEO_INPUT_EOS,
inputId: this.id,
});

this.decoder.close();
}

private queuePtsToInputPts(queuePts: number): number {
assert(this.startPtsMs !== undefined);
return queuePts - this.startPtsMs;
}

private tryEnqueueChunk() {
const chunk = this.source.nextChunk();
if (chunk) {
this.decoder.decode(chunk.data);
}
}

private enqueueChunks(currentQueuePts: number) {
const framrate = this.source.getFramerate();
assert(framrate);

const frameDuration = framerateToDurationMs(framrate);
const targetPts = this.queuePtsToInputPts(currentQueuePts) + frameDuration * MAX_BUFFERING_SIZE;

let chunk = this.source.peekChunk();
while (chunk && chunk.ptsMs < targetPts) {
this.decoder.decode(chunk.data);
this.source.nextChunk();
chunk = this.source.peekChunk();
}
}
}
Loading
Loading