diff --git a/ts/@live-compositor/web-wasm/src/input/frame.ts b/ts/@live-compositor/web-wasm/src/input/frame.ts new file mode 100644 index 000000000..7eeed00c8 --- /dev/null +++ b/ts/@live-compositor/web-wasm/src/input/frame.ts @@ -0,0 +1,81 @@ +import type { Frame } from '@live-compositor/browser-render'; +import { FrameFormat } from '@live-compositor/browser-render'; +import type { FrameWithPts } from './decoder/h264Decoder'; +import { assert } from '../utils'; + +/** + * Represents frame produced by decoder. + * Memory has to be manually managed by incrementing reference count on `FrameRef` copy and decrementing it once it's no longer used + * `Input` manages memory in `getFrameRef()` + * `Queue` on tick pulls `FrameRef` for each input and once render finishes, decrements the ref count + */ +export class FrameRef { + private frame: FrameWithPts; + private refCount: number; + private downloadedFrame?: Frame; + + public constructor(frame: FrameWithPts) { + this.frame = frame; + this.refCount = 1; + } + + /** + * Increments reference count. Should be called every time the reference is copied. + */ + public incrementRefCount(): void { + assert(this.refCount > 0); + this.refCount++; + } + + /** + * Decrements reference count. If reference count reaches 0, `FrameWithPts` is freed from the memory. + * It's unsafe to use the returned frame after `decrementRefCount()` call. + * Should be used after we're sure we no longer need the frame. + */ + public decrementRefCount(): void { + assert(this.refCount > 0); + + this.refCount--; + if (this.refCount === 0) { + this.frame.frame.close(); + } + } + + /** + * Returns underlying frame. Fails if frame was freed from memory. + */ + public async getFrame(): Promise { + assert(this.refCount > 0); + + if (!this.downloadedFrame) { + this.downloadedFrame = await downloadFrame(this.frame); + } + return this.downloadedFrame; + } + + public getPtsMs(): number { + return this.frame.ptsMs; + } +} + +async function downloadFrame(frameWithPts: FrameWithPts): Promise { + // Safari does not support conversion to RGBA + // Chrome does not support conversion to YUV + const isSafari = !!(window as any).safari; + const options = { + format: isSafari ? 'I420' : 'RGBA', + }; + + const frame = frameWithPts.frame; + const buffer = new Uint8ClampedArray(frame.allocationSize(options as VideoFrameCopyToOptions)); + await frame.copyTo(buffer, options as VideoFrameCopyToOptions); + + return { + resolution: { + width: frame.displayWidth, + height: frame.displayHeight, + }, + format: isSafari ? FrameFormat.YUV_BYTES : FrameFormat.RGBA_BYTES, + data: buffer, + }; +} diff --git a/ts/@live-compositor/web-wasm/src/input/input.ts b/ts/@live-compositor/web-wasm/src/input/input.ts index f98ccba7f..173b5c05d 100644 --- a/ts/@live-compositor/web-wasm/src/input/input.ts +++ b/ts/@live-compositor/web-wasm/src/input/input.ts @@ -3,10 +3,9 @@ import { CompositorEventType } from 'live-compositor'; import type { EventSender } from '../eventSender'; import type InputSource from './source'; import { Queue } from '@datastructures-js/queue'; -import type { FrameWithPts } from './decoder/h264Decoder'; import { H264Decoder } from './decoder/h264Decoder'; -import type { InputFrame } from './inputFrame'; -import { intoInputFrame } from './inputFrame'; +import { FrameRef } from './frame'; +import { assert, framerateToDurationMs } from '../utils'; export type InputState = 'waiting_for_start' | 'buffering' | 'playing' | 'finished'; @@ -18,7 +17,7 @@ export class Input { private source: InputSource; private decoder: H264Decoder; private eventSender: EventSender; - private frames: Queue; + private frames: Queue; /** * Queue PTS of the first frame */ @@ -31,7 +30,9 @@ export class Input { this.eventSender = eventSender; this.frames = new Queue(); this.decoder = new H264Decoder({ - onFrame: frame => this.frames.push(frame), + onFrame: frame => { + this.frames.push(new FrameRef(frame)); + }, }); this.source.registerCallbacks({ @@ -53,66 +54,87 @@ export class Input { }); } - public async getFrame(currentQueuePts: number): Promise { - this.enqueueChunks(); + public async getFrameRef(currentQueuePts: number): Promise { if (this.state === 'buffering') { this.handleBuffering(); + return; } if (this.state !== 'playing') { - return undefined; + return; + } + if (!this.startPtsMs) { + this.startPtsMs = currentQueuePts; } - await this.dropOldFrames(currentQueuePts); + this.dropOldFrames(currentQueuePts); + this.enqueueChunks(currentQueuePts); - const frame = this.frames.pop(); - if (!frame) { - if (!this.source.isFinished()) { - return undefined; - } + // No more chunks will be produced. Flush all the remaining frames from the decoder + if (this.source.isFinished() && this.decoder.decodeQueueSize() !== 0) { + await this.decoder.flush(); + } - if (this.decoder.decodeQueueSize() === 0) { - // Source and Decoder finished their jobs - this.handleEos(); - return undefined; - } + let frame: FrameRef | undefined; + if (this.source.isFinished() && this.frames.size() == 1) { + // Last frame is not poped by `dropOldFrames` + frame = this.frames.pop(); + } else { + frame = this.getLatestFrame(); + } - await this.decoder.flush(); - const frame = this.frames.pop(); - return frame && intoInputFrame(frame); + if (frame) { + return frame; } - return intoInputFrame(frame); + // Source received EOS & there is no more frames + if (this.source.isFinished()) { + this.handleEos(); + return; + } + + return undefined; } /** - * Removes frames older than provided `currentQueuePts` + * Retrieves latest frame and increments its reference count */ - private async dropOldFrames(currentQueuePts: number): Promise { - if (!this.startPtsMs) { - this.startPtsMs = currentQueuePts; + private getLatestFrame(): FrameRef | undefined { + const frame = this.frames.front(); + if (frame) { + frame.incrementRefCount(); + return frame; } - let frame = this.frames.front(); - while (frame && this.framePtsToQueuePts(frame.ptsMs)! < currentQueuePts) { - console.warn(`Input "${this.id}": Frame dropped. PTS ${frame.ptsMs}`); - frame.frame.close(); - this.enqueueChunks(); - this.frames.pop(); - - frame = this.frames.front(); - } + return undefined; } - private framePtsToQueuePts(framePtsMs: number): number | undefined { - if (this.startPtsMs) { - return this.startPtsMs + framePtsMs; + /** + * Removes frames older than provided `currentQueuePts` + */ + private dropOldFrames(currentQueuePts: number): void { + const targetPts = this.queuePtsToInputPts(currentQueuePts); + + const frames = this.frames.toArray(); + let minPtsDiff = Number.MAX_VALUE; + let frameIndex = -1; + for (let i = 0; i < frames.length; i++) { + const framePts = frames[i].getPtsMs(); + const diff = Math.abs(framePts - targetPts); + if (diff < minPtsDiff) { + minPtsDiff = diff; + frameIndex = i; + } } - return undefined; + for (let i = 0; i < frameIndex; i++) { + const frame = this.frames.pop(); + frame.decrementRefCount(); + } } private handleBuffering() { if (this.frames.size() < MAX_BUFFERING_SIZE) { + this.tryEnqueueChunk(); return; } @@ -133,16 +155,28 @@ export class Input { this.decoder.close(); } - private enqueueChunks() { - while ( - this.frames.size() < MAX_BUFFERING_SIZE && - this.decoder.decodeQueueSize() < MAX_BUFFERING_SIZE - ) { - const chunk = this.source.nextChunk(); - if (!chunk) { - break; - } - this.decoder.decode(chunk); + private queuePtsToInputPts(queuePts: number): number { + const startTime = assert(this.startPtsMs); + return queuePts - startTime; + } + + private tryEnqueueChunk() { + const chunk = this.source.nextChunk(); + if (chunk) { + this.decoder.decode(chunk.data); + } + } + + private enqueueChunks(currentQueuePts: number) { + const framrate = assert(this.source.getFramerate()); + const frameDuration = framerateToDurationMs(framrate); + const targetPts = this.queuePtsToInputPts(currentQueuePts) + frameDuration * MAX_BUFFERING_SIZE; + + let chunk = this.source.peekChunk(); + while (chunk && chunk.ptsMs < targetPts) { + this.decoder.decode(chunk.data); + this.source.nextChunk(); + chunk = this.source.peekChunk(); } } } diff --git a/ts/@live-compositor/web-wasm/src/input/inputFrame.ts b/ts/@live-compositor/web-wasm/src/input/inputFrame.ts deleted file mode 100644 index 0fd0689de..000000000 --- a/ts/@live-compositor/web-wasm/src/input/inputFrame.ts +++ /dev/null @@ -1,38 +0,0 @@ -import type { Frame } from '@live-compositor/browser-render'; -import { FrameFormat } from '@live-compositor/browser-render'; -import type { FrameWithPts } from './decoder/h264Decoder'; - -/** - * Represents frame produced by decoder. - * `InputFrame` has to be manually freed from the memory by calling `free()` method. Once freed it no longer can be used. - * `Queue` on tick pulls `InputFrame` for each input and once render finishes, manually frees `InputFrame`s. - */ -export type InputFrame = Frame & { - /** - * Frees `InputFrame` from memory. `InputFrame` can not be used after `free()`. - */ - free: () => void; -}; - -export async function intoInputFrame(decodedFrame: FrameWithPts): Promise { - // Safari does not support conversion to RGBA - // Chrome does not support conversion to YUV - const isSafari = !!(window as any).safari; - const options = { - format: isSafari ? 'I420' : 'RGBA', - }; - - const frame = decodedFrame.frame; - const buffer = new Uint8ClampedArray(frame.allocationSize(options as VideoFrameCopyToOptions)); - await frame.copyTo(buffer, options as VideoFrameCopyToOptions); - - return { - resolution: { - width: frame.displayWidth, - height: frame.displayHeight, - }, - format: isSafari ? FrameFormat.YUV_BYTES : FrameFormat.RGBA_BYTES, - data: buffer, - free: () => frame.close(), - }; -} diff --git a/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts b/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts index e0fde5f2c..618b6cfe0 100644 --- a/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts +++ b/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts @@ -1,9 +1,16 @@ import type { MP4ArrayBuffer, MP4File, MP4Info, Sample } from 'mp4box'; import MP4Box, { DataStream } from 'mp4box'; import type { SourcePayload } from '../source'; +import { assert } from '../../utils'; +import type { Framerate } from '../../compositor'; + +export type Mp4ReadyData = { + decoderConfig: VideoDecoderConfig; + framerate: Framerate; +}; export type MP4DemuxerCallbacks = { - onConfig: (config: VideoDecoderConfig) => void; + onReady: (data: Mp4ReadyData) => void; onPayload: (payload: SourcePayload) => void; }; @@ -46,11 +53,20 @@ export class MP4Demuxer { const codecDescription = this.getCodecDescription(videoTrack.id); this.samplesCount = videoTrack.nb_samples; - this.callbacks.onConfig({ + const decoderConfig = { codec: videoTrack.codec, codedWidth: videoTrack.video.width, codedHeight: videoTrack.video.height, description: codecDescription, + }; + const framerate = { + num: videoTrack.timescale, + den: 1000, + }; + + this.callbacks.onReady({ + decoderConfig, + framerate, }); this.file.setExtractionOptions(videoTrack.id); @@ -58,6 +74,8 @@ export class MP4Demuxer { } private onSamples(samples: Sample[]) { + const samplesCount = assert(this.samplesCount); + for (const sample of samples) { const chunk = new EncodedVideoChunk({ type: sample.is_sync ? 'key' : 'delta', @@ -68,7 +86,7 @@ export class MP4Demuxer { this.callbacks.onPayload({ type: 'chunk', chunk: chunk }); - if (sample.number === this.samplesCount! - 1) { + if (sample.number === samplesCount - 1) { this.callbacks.onPayload({ type: 'eos' }); } } diff --git a/ts/@live-compositor/web-wasm/src/input/mp4/source.ts b/ts/@live-compositor/web-wasm/src/input/mp4/source.ts index 09cc1405b..61a1fe1e1 100644 --- a/ts/@live-compositor/web-wasm/src/input/mp4/source.ts +++ b/ts/@live-compositor/web-wasm/src/input/mp4/source.ts @@ -1,7 +1,10 @@ +import type { Mp4ReadyData } from './demuxer'; import { MP4Demuxer } from './demuxer'; import type InputSource from '../source'; -import type { InputSourceCallbacks, SourcePayload } from '../source'; +import type { InputSourceCallbacks, SourcePayload, VideoChunk } from '../source'; import { Queue } from '@datastructures-js/queue'; +import { assert } from '../../utils'; +import type { Framerate } from '../../compositor'; export default class MP4Source implements InputSource { private fileUrl: string; @@ -10,11 +13,13 @@ export default class MP4Source implements InputSource { private callbacks?: InputSourceCallbacks; private chunks: Queue; private eosReceived: boolean = false; + private ptsOffset?: number; + private framerate?: Framerate; public constructor(fileUrl: string) { this.fileUrl = fileUrl; this.demuxer = new MP4Demuxer({ - onConfig: config => this.callbacks?.onDecoderConfig(config), + onReady: config => this.handleOnReady(config), onPayload: payload => this.handlePayload(payload), }); this.chunks = new Queue(); @@ -42,15 +47,42 @@ export default class MP4Source implements InputSource { return this.eosReceived && this.chunks.isEmpty(); } - public nextChunk(): EncodedVideoChunk | undefined { - return this.chunks.pop(); + public getFramerate(): Framerate | undefined { + return this.framerate; + } + + public nextChunk(): VideoChunk | undefined { + const chunk = this.chunks.pop(); + return chunk && this.intoVideoChunk(chunk); + } + + public peekChunk(): VideoChunk | undefined { + const chunk = this.chunks.front(); + return chunk && this.intoVideoChunk(chunk); + } + + private handleOnReady(data: Mp4ReadyData) { + this.callbacks?.onDecoderConfig(data.decoderConfig); + this.framerate = data.framerate; } private handlePayload(payload: SourcePayload) { if (payload.type === 'chunk') { + if (!this.ptsOffset) { + this.ptsOffset = -payload.chunk.timestamp; + } this.chunks.push(payload.chunk); } else if (payload.type === 'eos') { this.eosReceived = true; } } + + private intoVideoChunk(chunk: EncodedVideoChunk): VideoChunk { + const offset = assert(this.ptsOffset); + + return { + data: chunk, + ptsMs: (offset + chunk.timestamp) / 1000, + }; + } } diff --git a/ts/@live-compositor/web-wasm/src/input/source.ts b/ts/@live-compositor/web-wasm/src/input/source.ts index 5c4ebfc5d..b3a429636 100644 --- a/ts/@live-compositor/web-wasm/src/input/source.ts +++ b/ts/@live-compositor/web-wasm/src/input/source.ts @@ -1,5 +1,11 @@ import type { RegisterInputRequest } from '@live-compositor/core'; import MP4Source from './mp4/source'; +import type { Framerate } from '../compositor'; + +export type VideoChunk = { + data: EncodedVideoChunk; + ptsMs: number; +}; export type SourcePayload = { type: 'chunk'; chunk: EncodedVideoChunk } | { type: 'eos' }; @@ -14,9 +20,13 @@ export default interface InputSource { */ start(): void; registerCallbacks(callbacks: InputSourceCallbacks): void; - // if `true` InputSource won't produce more chunks anymore + /** + * if `true` InputSource won't produce more chunks anymore + */ isFinished(): boolean; - nextChunk(): EncodedVideoChunk | undefined; + getFramerate(): Framerate | undefined; + nextChunk(): VideoChunk | undefined; + peekChunk(): VideoChunk | undefined; } export function sourceFromRequest(request: RegisterInputRequest): InputSource { diff --git a/ts/@live-compositor/web-wasm/src/queue.ts b/ts/@live-compositor/web-wasm/src/queue.ts index 7ee4324e9..391835f93 100644 --- a/ts/@live-compositor/web-wasm/src/queue.ts +++ b/ts/@live-compositor/web-wasm/src/queue.ts @@ -1,8 +1,9 @@ -import type { FrameSet, InputId, OutputId, Renderer } from '@live-compositor/browser-render'; +import type { Frame, FrameSet, InputId, OutputId, Renderer } from '@live-compositor/browser-render'; import type { Framerate } from './compositor'; import type { Input } from './input/input'; -import type { InputFrame } from './input/inputFrame'; import type { Output } from './output/output'; +import { framerateToDurationMs } from './utils'; +import type { FrameRef } from './input/frame'; export type StopQueueFn = () => void; @@ -20,7 +21,7 @@ export class Queue { } public start(): StopQueueFn { - const tickDuration = (1000 * this.framerate.den) / this.framerate.num; + const tickDuration = framerateToDurationMs(this.framerate); const queueInterval = setInterval(async () => { await this.onTick(); this.currentPts += tickDuration; @@ -61,21 +62,26 @@ export class Queue { private async onTick() { const inputs = await this.getInputFrames(); + const frames: Record = {}; + for (const inputId in inputs) { + frames[inputId] = await inputs[inputId].getFrame(); + } + const outputs = this.renderer.render({ ptsMs: this.currentPts, - frames: inputs, + frames: frames, }); this.sendOutputs(outputs); for (const input of Object.values(inputs)) { - input.free(); + input.decrementRefCount(); } } - private async getInputFrames(): Promise> { + private async getInputFrames(): Promise> { const pendingFrames = Object.entries(this.inputs).map(async ([inputId, input]) => [ inputId, - await input.getFrame(this.currentPts), + await input.getFrameRef(this.currentPts), ]); const frames = await Promise.all(pendingFrames); return Object.fromEntries(frames.filter(([_inputId, frame]) => !!frame)); diff --git a/ts/@live-compositor/web-wasm/src/utils.ts b/ts/@live-compositor/web-wasm/src/utils.ts new file mode 100644 index 000000000..b4667b026 --- /dev/null +++ b/ts/@live-compositor/web-wasm/src/utils.ts @@ -0,0 +1,13 @@ +import type { Framerate } from './compositor'; + +export function assert(value?: T): T { + if (!value) { + throw new Error('Assertion failed'); + } + + return value; +} + +export function framerateToDurationMs(framerate: Framerate): number { + return (1000 * framerate.den) / framerate.num; +}