Skip to content

Commit

Permalink
Review remarks
Browse files Browse the repository at this point in the history
  • Loading branch information
noituri committed Nov 28, 2024
1 parent 522f888 commit a550383
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 105 deletions.
81 changes: 81 additions & 0 deletions ts/@live-compositor/web-wasm/src/input/frame.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import type { Frame } from '@live-compositor/browser-render';
import { FrameFormat } from '@live-compositor/browser-render';
import type { FrameWithPts } from './decoder/h264Decoder';
import { assert } from '../utils';

/**
* Represents frame produced by decoder.
* Memory has to be manually managed by incrementing reference count on `FrameRef` copy and decrementing it once it's no longer used
* `Input` manages memory in `getFrameRef()`
* `Queue` on tick pulls `FrameRef` for each input and once render finishes, decrements the ref count
*/
export class FrameRef {
private frame: FrameWithPts;
private refCount: number;
private downloadedFrame?: Frame;

public constructor(frame: FrameWithPts) {
this.frame = frame;
this.refCount = 1;
}

/**
* Increments reference count. Should be called every time the reference is copied.
*/
public incrementRefCount(): void {
assert(this.refCount > 0);
this.refCount++;
}

/**
* Decrements reference count. If reference count reaches 0, `FrameWithPts` is freed from the memory.
* It's unsafe to use the returned frame after `decrementRefCount()` call.
* Should be used after we're sure we no longer need the frame.
*/
public decrementRefCount(): void {
assert(this.refCount > 0);

this.refCount--;
if (this.refCount === 0) {
this.frame.frame.close();
}
}

/**
* Returns underlying frame. Fails if frame was freed from memory.
*/
public async getFrame(): Promise<Frame> {
assert(this.refCount > 0);

if (!this.downloadedFrame) {
this.downloadedFrame = await downloadFrame(this.frame);
}
return this.downloadedFrame;
}

public getPtsMs(): number {
return this.frame.ptsMs;
}
}

async function downloadFrame(frameWithPts: FrameWithPts): Promise<Frame> {
// Safari does not support conversion to RGBA
// Chrome does not support conversion to YUV
const isSafari = !!(window as any).safari;
const options = {
format: isSafari ? 'I420' : 'RGBA',
};

const frame = frameWithPts.frame;
const buffer = new Uint8ClampedArray(frame.allocationSize(options as VideoFrameCopyToOptions));
await frame.copyTo(buffer, options as VideoFrameCopyToOptions);

return {
resolution: {
width: frame.displayWidth,
height: frame.displayHeight,
},
format: isSafari ? FrameFormat.YUV_BYTES : FrameFormat.RGBA_BYTES,
data: buffer,
};
}
125 changes: 74 additions & 51 deletions ts/@live-compositor/web-wasm/src/input/input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ import { CompositorEventType } from 'live-compositor';
import type { EventSender } from '../eventSender';
import type InputSource from './source';
import { Queue } from '@datastructures-js/queue';
import type { FrameWithPts } from './decoder/h264Decoder';
import { H264Decoder } from './decoder/h264Decoder';
import type { InputFrame } from './inputFrame';
import { intoInputFrame } from './inputFrame';
import { FrameRef } from './frame';
import { assert, framerateToDurationMs } from '../utils';

export type InputState = 'waiting_for_start' | 'buffering' | 'playing' | 'finished';

Expand All @@ -18,7 +17,7 @@ export class Input {
private source: InputSource;
private decoder: H264Decoder;
private eventSender: EventSender;
private frames: Queue<FrameWithPts>;
private frames: Queue<FrameRef>;
/**
* Queue PTS of the first frame
*/
Expand All @@ -31,7 +30,9 @@ export class Input {
this.eventSender = eventSender;
this.frames = new Queue();
this.decoder = new H264Decoder({
onFrame: frame => this.frames.push(frame),
onFrame: frame => {
this.frames.push(new FrameRef(frame));
},
});

this.source.registerCallbacks({
Expand All @@ -53,66 +54,76 @@ export class Input {
});
}

public async getFrame(currentQueuePts: number): Promise<InputFrame | undefined> {
this.enqueueChunks();
public async getFrameRef(currentQueuePts: number): Promise<FrameRef | undefined> {
if (this.state === 'buffering') {
this.handleBuffering();
return;
}
if (this.state !== 'playing') {
return undefined;
return;
}
if (!this.startPtsMs) {
this.startPtsMs = currentQueuePts;
}

await this.dropOldFrames(currentQueuePts);
this.dropOldFrames(currentQueuePts);
this.enqueueChunks(currentQueuePts);

const frame = this.frames.pop();
if (!frame) {
if (!this.source.isFinished()) {
return undefined;
}
let frame = this.peekFrame();
if (frame) {
return frame;
}

if (this.decoder.decodeQueueSize() === 0) {
// Source and Decoder finished their jobs
this.handleEos();
return undefined;
if (this.source.isFinished()) {
if (this.decoder.decodeQueueSize() !== 0) {
await this.decoder.flush();
return this.peekFrame();
}

await this.decoder.flush();
const frame = this.frames.pop();
return frame && intoInputFrame(frame);
this.handleEos();
return;
}

return intoInputFrame(frame);
return undefined;
}

/**
* Removes frames older than provided `currentQueuePts`
*/
private async dropOldFrames(currentQueuePts: number): Promise<void> {
if (!this.startPtsMs) {
this.startPtsMs = currentQueuePts;
private peekFrame(): FrameRef | undefined {
const frame = this.frames.front();
if (frame) {
frame.incrementRefCount();
return frame;
}

let frame = this.frames.front();
while (frame && this.framePtsToQueuePts(frame.ptsMs)! < currentQueuePts) {
console.warn(`Input "${this.id}": Frame dropped. PTS ${frame.ptsMs}`);
frame.frame.close();
this.enqueueChunks();
this.frames.pop();

frame = this.frames.front();
}
return undefined;
}

private framePtsToQueuePts(framePtsMs: number): number | undefined {
if (this.startPtsMs) {
return this.startPtsMs + framePtsMs;
/**
* Removes frames older than provided `currentQueuePts`
*/
private dropOldFrames(currentQueuePts: number): void {
const targetPts = this.queuePtsToInputPts(currentQueuePts);

const frames = this.frames.toArray();
let minPtsDiff = Number.MAX_VALUE;
let frameIndex = -1;
for (let i = 0; i < frames.length; i++) {
const framePts = frames[i].getPtsMs();
const diff = Math.abs(framePts - targetPts);
if (diff < minPtsDiff) {
minPtsDiff = diff;
frameIndex = i;
}
}

return undefined;
for (let i = 0; i < frameIndex; i++) {
const frame = this.frames.pop();
frame.decrementRefCount();
}
}

private handleBuffering() {
if (this.frames.size() < MAX_BUFFERING_SIZE) {
this.tryEnqueueChunk();
return;
}

Expand All @@ -133,16 +144,28 @@ export class Input {
this.decoder.close();
}

private enqueueChunks() {
while (
this.frames.size() < MAX_BUFFERING_SIZE &&
this.decoder.decodeQueueSize() < MAX_BUFFERING_SIZE
) {
const chunk = this.source.nextChunk();
if (!chunk) {
break;
}
this.decoder.decode(chunk);
private queuePtsToInputPts(queuePts: number): number {
const startTime = assert(this.startPtsMs);
return queuePts - startTime;
}

private tryEnqueueChunk() {
const chunk = this.source.nextChunk();
if (chunk) {
this.decoder.decode(chunk.data);
}
}

private enqueueChunks(currentQueuePts: number) {
const framrate = assert(this.source.getFramerate());
const frameDuration = framerateToDurationMs(framrate);
const targetPts = this.queuePtsToInputPts(currentQueuePts) + frameDuration * MAX_BUFFERING_SIZE;

let chunk = this.source.peekChunk();
while (chunk && chunk.ptsMs < targetPts) {
this.decoder.decode(chunk.data);
this.source.nextChunk();
chunk = this.source.peekChunk();
}
}
}
38 changes: 0 additions & 38 deletions ts/@live-compositor/web-wasm/src/input/inputFrame.ts

This file was deleted.

24 changes: 21 additions & 3 deletions ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import type { MP4ArrayBuffer, MP4File, MP4Info, Sample } from 'mp4box';
import MP4Box, { DataStream } from 'mp4box';
import type { SourcePayload } from '../source';
import { assert } from '../../utils';
import type { Framerate } from '../../compositor';

export type Mp4ReadyData = {
decoderConfig: VideoDecoderConfig;
framerate: Framerate;
};

export type MP4DemuxerCallbacks = {
onConfig: (config: VideoDecoderConfig) => void;
onReady: (data: Mp4ReadyData) => void;
onPayload: (payload: SourcePayload) => void;
};

Expand Down Expand Up @@ -46,18 +53,29 @@ export class MP4Demuxer {
const codecDescription = this.getCodecDescription(videoTrack.id);
this.samplesCount = videoTrack.nb_samples;

this.callbacks.onConfig({
const decoderConfig = {
codec: videoTrack.codec,
codedWidth: videoTrack.video.width,
codedHeight: videoTrack.video.height,
description: codecDescription,
};
const framerate = {
num: videoTrack.timescale,
den: 1000,
};

this.callbacks.onReady({
decoderConfig,
framerate,
});

this.file.setExtractionOptions(videoTrack.id);
this.file.start();
}

private onSamples(samples: Sample[]) {
const samplesCount = assert(this.samplesCount);

for (const sample of samples) {
const chunk = new EncodedVideoChunk({
type: sample.is_sync ? 'key' : 'delta',
Expand All @@ -68,7 +86,7 @@ export class MP4Demuxer {

this.callbacks.onPayload({ type: 'chunk', chunk: chunk });

if (sample.number === this.samplesCount! - 1) {
if (sample.number === samplesCount - 1) {
this.callbacks.onPayload({ type: 'eos' });
}
}
Expand Down
Loading

0 comments on commit a550383

Please sign in to comment.