From 802a23412f7ba13bcdf8b45d23083808c26af16a Mon Sep 17 00:00:00 2001 From: noituri Date: Mon, 18 Nov 2024 14:10:04 +0100 Subject: [PATCH 1/4] [wasm] Handle EOS and inputs with different framerates --- .../web-wasm/src/input/decoder/h264Decoder.ts | 62 ++++---- .../web-wasm/src/input/input.ts | 138 ++++++++++++++---- .../web-wasm/src/input/inputFrame.ts | 38 +++++ .../web-wasm/src/input/mp4/demuxer.ts | 27 ++-- .../web-wasm/src/input/mp4/mp4box.d.ts | 2 + .../web-wasm/src/input/mp4/source.ts | 56 +++---- .../web-wasm/src/input/source.ts | 12 +- ts/@live-compositor/web-wasm/src/queue.ts | 5 +- .../src/examples/MP4Player.tsx | 22 ++- 9 files changed, 255 insertions(+), 107 deletions(-) create mode 100644 ts/@live-compositor/web-wasm/src/input/inputFrame.ts diff --git a/ts/@live-compositor/web-wasm/src/input/decoder/h264Decoder.ts b/ts/@live-compositor/web-wasm/src/input/decoder/h264Decoder.ts index a7d48f071..df8a23b38 100644 --- a/ts/@live-compositor/web-wasm/src/input/decoder/h264Decoder.ts +++ b/ts/@live-compositor/web-wasm/src/input/decoder/h264Decoder.ts @@ -1,22 +1,22 @@ -import { Queue } from '@datastructures-js/queue'; +export type FrameWithPts = { + frame: Omit; + ptsMs: number; +}; -const MAX_DECODED_FRAMES = 3; +export type H264DecoderProps = { + onFrame: (frame: FrameWithPts) => void; +}; export class H264Decoder { - private chunks: Queue; - private frames: Queue; private decoder: VideoDecoder; + private ptsOffset?: number; - public constructor() { - this.chunks = new Queue(); - this.frames = new Queue(); + public constructor(props: H264DecoderProps) { // TODO(noituri): Use web workers this.decoder = new VideoDecoder({ - output: frame => { - this.frames.push(frame); - }, + output: videoFrame => props.onFrame(this.intoFrameWithPts(videoFrame)), error: error => { - console.error(`MP4Decoder error: ${error}`); + console.error(`H264Decoder error: ${error}`); }, }); } @@ -25,29 +25,31 @@ export class H264Decoder { this.decoder.configure(config); } - public enqueueChunk(chunk: EncodedVideoChunk) { - this.chunks.push(chunk); - this.decodeChunks(); + public decode(chunk: EncodedVideoChunk) { + this.decoder.decode(chunk); } - /** - * Returns decoded video frames. Frames have to be manually freed from memory - */ - public getFrame(): VideoFrame | undefined { - this.decodeChunks(); - return this.frames.pop(); + public decodeQueueSize(): number { + return this.decoder.decodeQueueSize; } - private decodeChunks() { - while ( - this.frames.size() < MAX_DECODED_FRAMES && - this.decoder.decodeQueueSize < MAX_DECODED_FRAMES - ) { - const chunk = this.chunks.pop(); - if (!chunk) { - break; - } - this.decoder.decode(chunk); + private intoFrameWithPts(videoFrame: VideoFrame): FrameWithPts { + if (!this.ptsOffset) { + this.ptsOffset = -videoFrame.timestamp; } + + return { + frame: videoFrame, + // TODO(noituri): Handle pts roller + ptsMs: (this.ptsOffset! + videoFrame.timestamp) / 1000, + }; + } + + public async flush(): Promise { + await this.decoder.flush(); + } + + public close() { + this.decoder.close(); } } diff --git a/ts/@live-compositor/web-wasm/src/input/input.ts b/ts/@live-compositor/web-wasm/src/input/input.ts index b8c756a33..f98ccba7f 100644 --- a/ts/@live-compositor/web-wasm/src/input/input.ts +++ b/ts/@live-compositor/web-wasm/src/input/input.ts @@ -1,33 +1,42 @@ -import type { Frame, InputId } from '@live-compositor/browser-render'; +import type { InputId } from '@live-compositor/browser-render'; import { CompositorEventType } from 'live-compositor'; import type { EventSender } from '../eventSender'; import type InputSource from './source'; - -/** - * Represents frame produced by decoder. - * `InputFrame` has to be manually freed from the memory by calling `free()` method. Once freed it no longer can be used. - * `Queue` on tick pulls `InputFrame` for each input and once render finishes, manually frees `InputFrame`s. - */ -export type InputFrame = Frame & { - /** - * Frees `InputFrame` from memory. `InputFrame` can not be used after `free()`. - */ - free: () => void; -}; +import { Queue } from '@datastructures-js/queue'; +import type { FrameWithPts } from './decoder/h264Decoder'; +import { H264Decoder } from './decoder/h264Decoder'; +import type { InputFrame } from './inputFrame'; +import { intoInputFrame } from './inputFrame'; export type InputState = 'waiting_for_start' | 'buffering' | 'playing' | 'finished'; +const MAX_BUFFERING_SIZE = 3; + export class Input { private id: InputId; - private source: InputSource; private state: InputState; + private source: InputSource; + private decoder: H264Decoder; private eventSender: EventSender; + private frames: Queue; + /** + * Queue PTS of the first frame + */ + private startPtsMs?: number; public constructor(id: InputId, source: InputSource, eventSender: EventSender) { this.id = id; this.state = 'waiting_for_start'; this.source = source; this.eventSender = eventSender; + this.frames = new Queue(); + this.decoder = new H264Decoder({ + onFrame: frame => this.frames.push(frame), + }); + + this.source.registerCallbacks({ + onDecoderConfig: config => this.decoder.configure(config), + }); } public start() { @@ -35,6 +44,7 @@ export class Input { console.warn(`Tried to start an already started input "${this.id}"`); return; } + this.source.start(); this.state = 'buffering'; this.eventSender.sendEvent({ @@ -43,18 +53,96 @@ export class Input { }); } - public async getFrame(): Promise { - const frame = await this.source.getFrame(); - // TODO(noituri): Handle this better - if (frame && this.state === 'buffering') { - this.state = 'playing'; - this.eventSender.sendEvent({ - type: CompositorEventType.VIDEO_INPUT_PLAYING, - inputId: this.id, - }); + public async getFrame(currentQueuePts: number): Promise { + this.enqueueChunks(); + if (this.state === 'buffering') { + this.handleBuffering(); + } + if (this.state !== 'playing') { + return undefined; + } + + await this.dropOldFrames(currentQueuePts); + + const frame = this.frames.pop(); + if (!frame) { + if (!this.source.isFinished()) { + return undefined; + } + + if (this.decoder.decodeQueueSize() === 0) { + // Source and Decoder finished their jobs + this.handleEos(); + return undefined; + } + + await this.decoder.flush(); + const frame = this.frames.pop(); + return frame && intoInputFrame(frame); + } + + return intoInputFrame(frame); + } + + /** + * Removes frames older than provided `currentQueuePts` + */ + private async dropOldFrames(currentQueuePts: number): Promise { + if (!this.startPtsMs) { + this.startPtsMs = currentQueuePts; } - // TODO(noituri): Handle EOS - return frame; + let frame = this.frames.front(); + while (frame && this.framePtsToQueuePts(frame.ptsMs)! < currentQueuePts) { + console.warn(`Input "${this.id}": Frame dropped. PTS ${frame.ptsMs}`); + frame.frame.close(); + this.enqueueChunks(); + this.frames.pop(); + + frame = this.frames.front(); + } + } + + private framePtsToQueuePts(framePtsMs: number): number | undefined { + if (this.startPtsMs) { + return this.startPtsMs + framePtsMs; + } + + return undefined; + } + + private handleBuffering() { + if (this.frames.size() < MAX_BUFFERING_SIZE) { + return; + } + + this.state = 'playing'; + this.eventSender.sendEvent({ + type: CompositorEventType.VIDEO_INPUT_PLAYING, + inputId: this.id, + }); + } + + private handleEos() { + this.state = 'finished'; + this.eventSender.sendEvent({ + type: CompositorEventType.VIDEO_INPUT_EOS, + inputId: this.id, + }); + + this.decoder.close(); + } + + private enqueueChunks() { + while ( + this.frames.size() < MAX_BUFFERING_SIZE && + this.decoder.decodeQueueSize() < MAX_BUFFERING_SIZE + ) { + const chunk = this.source.nextChunk(); + if (!chunk) { + break; + } + this.decoder.decode(chunk); + } } } diff --git a/ts/@live-compositor/web-wasm/src/input/inputFrame.ts b/ts/@live-compositor/web-wasm/src/input/inputFrame.ts new file mode 100644 index 000000000..0fd0689de --- /dev/null +++ b/ts/@live-compositor/web-wasm/src/input/inputFrame.ts @@ -0,0 +1,38 @@ +import type { Frame } from '@live-compositor/browser-render'; +import { FrameFormat } from '@live-compositor/browser-render'; +import type { FrameWithPts } from './decoder/h264Decoder'; + +/** + * Represents frame produced by decoder. + * `InputFrame` has to be manually freed from the memory by calling `free()` method. Once freed it no longer can be used. + * `Queue` on tick pulls `InputFrame` for each input and once render finishes, manually frees `InputFrame`s. + */ +export type InputFrame = Frame & { + /** + * Frees `InputFrame` from memory. `InputFrame` can not be used after `free()`. + */ + free: () => void; +}; + +export async function intoInputFrame(decodedFrame: FrameWithPts): Promise { + // Safari does not support conversion to RGBA + // Chrome does not support conversion to YUV + const isSafari = !!(window as any).safari; + const options = { + format: isSafari ? 'I420' : 'RGBA', + }; + + const frame = decodedFrame.frame; + const buffer = new Uint8ClampedArray(frame.allocationSize(options as VideoFrameCopyToOptions)); + await frame.copyTo(buffer, options as VideoFrameCopyToOptions); + + return { + resolution: { + width: frame.displayWidth, + height: frame.displayHeight, + }, + format: isSafari ? FrameFormat.YUV_BYTES : FrameFormat.RGBA_BYTES, + data: buffer, + free: () => frame.close(), + }; +} diff --git a/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts b/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts index 47338d63a..e0fde5f2c 100644 --- a/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts +++ b/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts @@ -1,17 +1,19 @@ import type { MP4ArrayBuffer, MP4File, MP4Info, Sample } from 'mp4box'; import MP4Box, { DataStream } from 'mp4box'; +import type { SourcePayload } from '../source'; -export type OnConfig = (config: VideoDecoderConfig) => void; - -export type OnChunk = (chunk: EncodedVideoChunk) => void; +export type MP4DemuxerCallbacks = { + onConfig: (config: VideoDecoderConfig) => void; + onPayload: (payload: SourcePayload) => void; +}; export class MP4Demuxer { private file: MP4File; private fileOffset: number; - private onConfig: OnConfig; - private onChunk: OnChunk; + private callbacks: MP4DemuxerCallbacks; + private samplesCount?: number; - public constructor(props: { onConfig: OnConfig; onChunk: OnChunk }) { + public constructor(callbacks: MP4DemuxerCallbacks) { this.file = MP4Box.createFile(); this.file.onReady = info => this.onReady(info); this.file.onSamples = (_id, _user, samples) => this.onSamples(samples); @@ -20,8 +22,7 @@ export class MP4Demuxer { }; this.fileOffset = 0; - this.onConfig = props.onConfig; - this.onChunk = props.onChunk; + this.callbacks = callbacks; } public demux(data: ArrayBuffer) { @@ -43,7 +44,9 @@ export class MP4Demuxer { const videoTrack = info.videoTracks[0]; const codecDescription = this.getCodecDescription(videoTrack.id); - this.onConfig({ + this.samplesCount = videoTrack.nb_samples; + + this.callbacks.onConfig({ codec: videoTrack.codec, codedWidth: videoTrack.video.width, codedHeight: videoTrack.video.height, @@ -63,7 +66,11 @@ export class MP4Demuxer { data: sample.data, }); - this.onChunk(chunk); + this.callbacks.onPayload({ type: 'chunk', chunk: chunk }); + + if (sample.number === this.samplesCount! - 1) { + this.callbacks.onPayload({ type: 'eos' }); + } } } diff --git a/ts/@live-compositor/web-wasm/src/input/mp4/mp4box.d.ts b/ts/@live-compositor/web-wasm/src/input/mp4/mp4box.d.ts index 5859feb47..df1131c38 100644 --- a/ts/@live-compositor/web-wasm/src/input/mp4/mp4box.d.ts +++ b/ts/@live-compositor/web-wasm/src/input/mp4/mp4box.d.ts @@ -33,6 +33,7 @@ declare module 'mp4box' { duration: number; bitrate: number; codec: string; + nb_samples: number; } export interface MP4VideoTrack extends MP4MediaTrack { @@ -61,6 +62,7 @@ declare module 'mp4box' { } export interface Sample { + number: number; timescale: number; data: ArrayBuffer; size: number; diff --git a/ts/@live-compositor/web-wasm/src/input/mp4/source.ts b/ts/@live-compositor/web-wasm/src/input/mp4/source.ts index 61a864b28..09cc1405b 100644 --- a/ts/@live-compositor/web-wasm/src/input/mp4/source.ts +++ b/ts/@live-compositor/web-wasm/src/input/mp4/source.ts @@ -1,28 +1,23 @@ -import { FrameFormat } from '@live-compositor/browser-render'; import { MP4Demuxer } from './demuxer'; -import { H264Decoder } from '../decoder/h264Decoder'; -import type { InputFrame } from '../input'; import type InputSource from '../source'; +import type { InputSourceCallbacks, SourcePayload } from '../source'; +import { Queue } from '@datastructures-js/queue'; export default class MP4Source implements InputSource { private fileUrl: string; private fileData?: ArrayBuffer; private demuxer: MP4Demuxer; - private decoder: H264Decoder; - private frameFormat: VideoPixelFormat; + private callbacks?: InputSourceCallbacks; + private chunks: Queue; + private eosReceived: boolean = false; public constructor(fileUrl: string) { this.fileUrl = fileUrl; this.demuxer = new MP4Demuxer({ - onConfig: config => this.decoder.configure(config), - onChunk: chunk => this.decoder.enqueueChunk(chunk), + onConfig: config => this.callbacks?.onDecoderConfig(config), + onPayload: payload => this.handlePayload(payload), }); - this.decoder = new H264Decoder(); - - // Safari does not support conversion to RGBA - // Chrome does not support conversion to YUV - const isSafari = !!(window as any).safari; - this.frameFormat = isSafari ? 'I420' : 'RGBA'; + this.chunks = new Queue(); } public async init(): Promise { @@ -39,26 +34,23 @@ export default class MP4Source implements InputSource { this.demuxer.flush(); } - public async getFrame(): Promise { - const frame = this.decoder.getFrame(); - if (!frame) { - return undefined; - } + public registerCallbacks(callbacks: InputSourceCallbacks): void { + this.callbacks = callbacks; + } - const options = { - format: this.frameFormat, - }; - const buffer = new Uint8ClampedArray(frame.allocationSize(options as VideoFrameCopyToOptions)); - await frame.copyTo(buffer, options as VideoFrameCopyToOptions); + public isFinished(): boolean { + return this.eosReceived && this.chunks.isEmpty(); + } - return { - resolution: { - width: frame.displayWidth, - height: frame.displayHeight, - }, - format: this.frameFormat == 'I420' ? FrameFormat.YUV_BYTES : FrameFormat.RGBA_BYTES, - data: buffer, - free: () => frame.close(), - }; + public nextChunk(): EncodedVideoChunk | undefined { + return this.chunks.pop(); + } + + private handlePayload(payload: SourcePayload) { + if (payload.type === 'chunk') { + this.chunks.push(payload.chunk); + } else if (payload.type === 'eos') { + this.eosReceived = true; + } } } diff --git a/ts/@live-compositor/web-wasm/src/input/source.ts b/ts/@live-compositor/web-wasm/src/input/source.ts index 13269be1f..5c4ebfc5d 100644 --- a/ts/@live-compositor/web-wasm/src/input/source.ts +++ b/ts/@live-compositor/web-wasm/src/input/source.ts @@ -1,14 +1,22 @@ import type { RegisterInputRequest } from '@live-compositor/core'; -import type { InputFrame } from './input'; import MP4Source from './mp4/source'; +export type SourcePayload = { type: 'chunk'; chunk: EncodedVideoChunk } | { type: 'eos' }; + +export type InputSourceCallbacks = { + onDecoderConfig: (config: VideoDecoderConfig) => void; +}; + export default interface InputSource { init(): Promise; /** * Starts input processing. `init()` has to be called beforehand. */ start(): void; - getFrame(): Promise; + registerCallbacks(callbacks: InputSourceCallbacks): void; + // if `true` InputSource won't produce more chunks anymore + isFinished(): boolean; + nextChunk(): EncodedVideoChunk | undefined; } export function sourceFromRequest(request: RegisterInputRequest): InputSource { diff --git a/ts/@live-compositor/web-wasm/src/queue.ts b/ts/@live-compositor/web-wasm/src/queue.ts index 390026b36..7ee4324e9 100644 --- a/ts/@live-compositor/web-wasm/src/queue.ts +++ b/ts/@live-compositor/web-wasm/src/queue.ts @@ -1,6 +1,7 @@ import type { FrameSet, InputId, OutputId, Renderer } from '@live-compositor/browser-render'; import type { Framerate } from './compositor'; -import type { Input, InputFrame } from './input/input'; +import type { Input } from './input/input'; +import type { InputFrame } from './input/inputFrame'; import type { Output } from './output/output'; export type StopQueueFn = () => void; @@ -74,7 +75,7 @@ export class Queue { private async getInputFrames(): Promise> { const pendingFrames = Object.entries(this.inputs).map(async ([inputId, input]) => [ inputId, - await input.getFrame(), + await input.getFrame(this.currentPts), ]); const frames = await Promise.all(pendingFrames); return Object.fromEntries(frames.filter(([_inputId, frame]) => !!frame)); diff --git a/ts/examples/vite-browser-render/src/examples/MP4Player.tsx b/ts/examples/vite-browser-render/src/examples/MP4Player.tsx index 39cddd8d9..fef317767 100644 --- a/ts/examples/vite-browser-render/src/examples/MP4Player.tsx +++ b/ts/examples/vite-browser-render/src/examples/MP4Player.tsx @@ -30,21 +30,31 @@ function Scene() { const inputs = useInputStreams(); const inputState = inputs['bunny_video']?.videoState; - if (inputState !== 'playing') { + if (inputState === 'playing') { + return ( + + + + Playing MP4 file + + + ); + } + + if (inputState === 'finished') { return ( - Loading MP4 file + Finished playing MP4 file ); } return ( - - - - Playing MP4 file + + + Loading MP4 file ); From 5d64deecdae40c2f12b5d9789f4b0e1002ce1638 Mon Sep 17 00:00:00 2001 From: noituri Date: Wed, 27 Nov 2024 23:37:39 +0100 Subject: [PATCH 2/4] Review remarks --- .../web-wasm/src/input/frame.ts | 81 +++++++++++ .../web-wasm/src/input/input.ts | 134 +++++++++++------- .../web-wasm/src/input/inputFrame.ts | 38 ----- .../web-wasm/src/input/mp4/demuxer.ts | 24 +++- .../web-wasm/src/input/mp4/source.ts | 40 +++++- .../web-wasm/src/input/source.ts | 14 +- ts/@live-compositor/web-wasm/src/queue.ts | 20 ++- ts/@live-compositor/web-wasm/src/utils.ts | 13 ++ 8 files changed, 260 insertions(+), 104 deletions(-) create mode 100644 ts/@live-compositor/web-wasm/src/input/frame.ts delete mode 100644 ts/@live-compositor/web-wasm/src/input/inputFrame.ts create mode 100644 ts/@live-compositor/web-wasm/src/utils.ts diff --git a/ts/@live-compositor/web-wasm/src/input/frame.ts b/ts/@live-compositor/web-wasm/src/input/frame.ts new file mode 100644 index 000000000..7eeed00c8 --- /dev/null +++ b/ts/@live-compositor/web-wasm/src/input/frame.ts @@ -0,0 +1,81 @@ +import type { Frame } from '@live-compositor/browser-render'; +import { FrameFormat } from '@live-compositor/browser-render'; +import type { FrameWithPts } from './decoder/h264Decoder'; +import { assert } from '../utils'; + +/** + * Represents frame produced by decoder. + * Memory has to be manually managed by incrementing reference count on `FrameRef` copy and decrementing it once it's no longer used + * `Input` manages memory in `getFrameRef()` + * `Queue` on tick pulls `FrameRef` for each input and once render finishes, decrements the ref count + */ +export class FrameRef { + private frame: FrameWithPts; + private refCount: number; + private downloadedFrame?: Frame; + + public constructor(frame: FrameWithPts) { + this.frame = frame; + this.refCount = 1; + } + + /** + * Increments reference count. Should be called every time the reference is copied. + */ + public incrementRefCount(): void { + assert(this.refCount > 0); + this.refCount++; + } + + /** + * Decrements reference count. If reference count reaches 0, `FrameWithPts` is freed from the memory. + * It's unsafe to use the returned frame after `decrementRefCount()` call. + * Should be used after we're sure we no longer need the frame. + */ + public decrementRefCount(): void { + assert(this.refCount > 0); + + this.refCount--; + if (this.refCount === 0) { + this.frame.frame.close(); + } + } + + /** + * Returns underlying frame. Fails if frame was freed from memory. + */ + public async getFrame(): Promise { + assert(this.refCount > 0); + + if (!this.downloadedFrame) { + this.downloadedFrame = await downloadFrame(this.frame); + } + return this.downloadedFrame; + } + + public getPtsMs(): number { + return this.frame.ptsMs; + } +} + +async function downloadFrame(frameWithPts: FrameWithPts): Promise { + // Safari does not support conversion to RGBA + // Chrome does not support conversion to YUV + const isSafari = !!(window as any).safari; + const options = { + format: isSafari ? 'I420' : 'RGBA', + }; + + const frame = frameWithPts.frame; + const buffer = new Uint8ClampedArray(frame.allocationSize(options as VideoFrameCopyToOptions)); + await frame.copyTo(buffer, options as VideoFrameCopyToOptions); + + return { + resolution: { + width: frame.displayWidth, + height: frame.displayHeight, + }, + format: isSafari ? FrameFormat.YUV_BYTES : FrameFormat.RGBA_BYTES, + data: buffer, + }; +} diff --git a/ts/@live-compositor/web-wasm/src/input/input.ts b/ts/@live-compositor/web-wasm/src/input/input.ts index f98ccba7f..173b5c05d 100644 --- a/ts/@live-compositor/web-wasm/src/input/input.ts +++ b/ts/@live-compositor/web-wasm/src/input/input.ts @@ -3,10 +3,9 @@ import { CompositorEventType } from 'live-compositor'; import type { EventSender } from '../eventSender'; import type InputSource from './source'; import { Queue } from '@datastructures-js/queue'; -import type { FrameWithPts } from './decoder/h264Decoder'; import { H264Decoder } from './decoder/h264Decoder'; -import type { InputFrame } from './inputFrame'; -import { intoInputFrame } from './inputFrame'; +import { FrameRef } from './frame'; +import { assert, framerateToDurationMs } from '../utils'; export type InputState = 'waiting_for_start' | 'buffering' | 'playing' | 'finished'; @@ -18,7 +17,7 @@ export class Input { private source: InputSource; private decoder: H264Decoder; private eventSender: EventSender; - private frames: Queue; + private frames: Queue; /** * Queue PTS of the first frame */ @@ -31,7 +30,9 @@ export class Input { this.eventSender = eventSender; this.frames = new Queue(); this.decoder = new H264Decoder({ - onFrame: frame => this.frames.push(frame), + onFrame: frame => { + this.frames.push(new FrameRef(frame)); + }, }); this.source.registerCallbacks({ @@ -53,66 +54,87 @@ export class Input { }); } - public async getFrame(currentQueuePts: number): Promise { - this.enqueueChunks(); + public async getFrameRef(currentQueuePts: number): Promise { if (this.state === 'buffering') { this.handleBuffering(); + return; } if (this.state !== 'playing') { - return undefined; + return; + } + if (!this.startPtsMs) { + this.startPtsMs = currentQueuePts; } - await this.dropOldFrames(currentQueuePts); + this.dropOldFrames(currentQueuePts); + this.enqueueChunks(currentQueuePts); - const frame = this.frames.pop(); - if (!frame) { - if (!this.source.isFinished()) { - return undefined; - } + // No more chunks will be produced. Flush all the remaining frames from the decoder + if (this.source.isFinished() && this.decoder.decodeQueueSize() !== 0) { + await this.decoder.flush(); + } - if (this.decoder.decodeQueueSize() === 0) { - // Source and Decoder finished their jobs - this.handleEos(); - return undefined; - } + let frame: FrameRef | undefined; + if (this.source.isFinished() && this.frames.size() == 1) { + // Last frame is not poped by `dropOldFrames` + frame = this.frames.pop(); + } else { + frame = this.getLatestFrame(); + } - await this.decoder.flush(); - const frame = this.frames.pop(); - return frame && intoInputFrame(frame); + if (frame) { + return frame; } - return intoInputFrame(frame); + // Source received EOS & there is no more frames + if (this.source.isFinished()) { + this.handleEos(); + return; + } + + return undefined; } /** - * Removes frames older than provided `currentQueuePts` + * Retrieves latest frame and increments its reference count */ - private async dropOldFrames(currentQueuePts: number): Promise { - if (!this.startPtsMs) { - this.startPtsMs = currentQueuePts; + private getLatestFrame(): FrameRef | undefined { + const frame = this.frames.front(); + if (frame) { + frame.incrementRefCount(); + return frame; } - let frame = this.frames.front(); - while (frame && this.framePtsToQueuePts(frame.ptsMs)! < currentQueuePts) { - console.warn(`Input "${this.id}": Frame dropped. PTS ${frame.ptsMs}`); - frame.frame.close(); - this.enqueueChunks(); - this.frames.pop(); - - frame = this.frames.front(); - } + return undefined; } - private framePtsToQueuePts(framePtsMs: number): number | undefined { - if (this.startPtsMs) { - return this.startPtsMs + framePtsMs; + /** + * Removes frames older than provided `currentQueuePts` + */ + private dropOldFrames(currentQueuePts: number): void { + const targetPts = this.queuePtsToInputPts(currentQueuePts); + + const frames = this.frames.toArray(); + let minPtsDiff = Number.MAX_VALUE; + let frameIndex = -1; + for (let i = 0; i < frames.length; i++) { + const framePts = frames[i].getPtsMs(); + const diff = Math.abs(framePts - targetPts); + if (diff < minPtsDiff) { + minPtsDiff = diff; + frameIndex = i; + } } - return undefined; + for (let i = 0; i < frameIndex; i++) { + const frame = this.frames.pop(); + frame.decrementRefCount(); + } } private handleBuffering() { if (this.frames.size() < MAX_BUFFERING_SIZE) { + this.tryEnqueueChunk(); return; } @@ -133,16 +155,28 @@ export class Input { this.decoder.close(); } - private enqueueChunks() { - while ( - this.frames.size() < MAX_BUFFERING_SIZE && - this.decoder.decodeQueueSize() < MAX_BUFFERING_SIZE - ) { - const chunk = this.source.nextChunk(); - if (!chunk) { - break; - } - this.decoder.decode(chunk); + private queuePtsToInputPts(queuePts: number): number { + const startTime = assert(this.startPtsMs); + return queuePts - startTime; + } + + private tryEnqueueChunk() { + const chunk = this.source.nextChunk(); + if (chunk) { + this.decoder.decode(chunk.data); + } + } + + private enqueueChunks(currentQueuePts: number) { + const framrate = assert(this.source.getFramerate()); + const frameDuration = framerateToDurationMs(framrate); + const targetPts = this.queuePtsToInputPts(currentQueuePts) + frameDuration * MAX_BUFFERING_SIZE; + + let chunk = this.source.peekChunk(); + while (chunk && chunk.ptsMs < targetPts) { + this.decoder.decode(chunk.data); + this.source.nextChunk(); + chunk = this.source.peekChunk(); } } } diff --git a/ts/@live-compositor/web-wasm/src/input/inputFrame.ts b/ts/@live-compositor/web-wasm/src/input/inputFrame.ts deleted file mode 100644 index 0fd0689de..000000000 --- a/ts/@live-compositor/web-wasm/src/input/inputFrame.ts +++ /dev/null @@ -1,38 +0,0 @@ -import type { Frame } from '@live-compositor/browser-render'; -import { FrameFormat } from '@live-compositor/browser-render'; -import type { FrameWithPts } from './decoder/h264Decoder'; - -/** - * Represents frame produced by decoder. - * `InputFrame` has to be manually freed from the memory by calling `free()` method. Once freed it no longer can be used. - * `Queue` on tick pulls `InputFrame` for each input and once render finishes, manually frees `InputFrame`s. - */ -export type InputFrame = Frame & { - /** - * Frees `InputFrame` from memory. `InputFrame` can not be used after `free()`. - */ - free: () => void; -}; - -export async function intoInputFrame(decodedFrame: FrameWithPts): Promise { - // Safari does not support conversion to RGBA - // Chrome does not support conversion to YUV - const isSafari = !!(window as any).safari; - const options = { - format: isSafari ? 'I420' : 'RGBA', - }; - - const frame = decodedFrame.frame; - const buffer = new Uint8ClampedArray(frame.allocationSize(options as VideoFrameCopyToOptions)); - await frame.copyTo(buffer, options as VideoFrameCopyToOptions); - - return { - resolution: { - width: frame.displayWidth, - height: frame.displayHeight, - }, - format: isSafari ? FrameFormat.YUV_BYTES : FrameFormat.RGBA_BYTES, - data: buffer, - free: () => frame.close(), - }; -} diff --git a/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts b/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts index e0fde5f2c..618b6cfe0 100644 --- a/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts +++ b/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts @@ -1,9 +1,16 @@ import type { MP4ArrayBuffer, MP4File, MP4Info, Sample } from 'mp4box'; import MP4Box, { DataStream } from 'mp4box'; import type { SourcePayload } from '../source'; +import { assert } from '../../utils'; +import type { Framerate } from '../../compositor'; + +export type Mp4ReadyData = { + decoderConfig: VideoDecoderConfig; + framerate: Framerate; +}; export type MP4DemuxerCallbacks = { - onConfig: (config: VideoDecoderConfig) => void; + onReady: (data: Mp4ReadyData) => void; onPayload: (payload: SourcePayload) => void; }; @@ -46,11 +53,20 @@ export class MP4Demuxer { const codecDescription = this.getCodecDescription(videoTrack.id); this.samplesCount = videoTrack.nb_samples; - this.callbacks.onConfig({ + const decoderConfig = { codec: videoTrack.codec, codedWidth: videoTrack.video.width, codedHeight: videoTrack.video.height, description: codecDescription, + }; + const framerate = { + num: videoTrack.timescale, + den: 1000, + }; + + this.callbacks.onReady({ + decoderConfig, + framerate, }); this.file.setExtractionOptions(videoTrack.id); @@ -58,6 +74,8 @@ export class MP4Demuxer { } private onSamples(samples: Sample[]) { + const samplesCount = assert(this.samplesCount); + for (const sample of samples) { const chunk = new EncodedVideoChunk({ type: sample.is_sync ? 'key' : 'delta', @@ -68,7 +86,7 @@ export class MP4Demuxer { this.callbacks.onPayload({ type: 'chunk', chunk: chunk }); - if (sample.number === this.samplesCount! - 1) { + if (sample.number === samplesCount - 1) { this.callbacks.onPayload({ type: 'eos' }); } } diff --git a/ts/@live-compositor/web-wasm/src/input/mp4/source.ts b/ts/@live-compositor/web-wasm/src/input/mp4/source.ts index 09cc1405b..61a1fe1e1 100644 --- a/ts/@live-compositor/web-wasm/src/input/mp4/source.ts +++ b/ts/@live-compositor/web-wasm/src/input/mp4/source.ts @@ -1,7 +1,10 @@ +import type { Mp4ReadyData } from './demuxer'; import { MP4Demuxer } from './demuxer'; import type InputSource from '../source'; -import type { InputSourceCallbacks, SourcePayload } from '../source'; +import type { InputSourceCallbacks, SourcePayload, VideoChunk } from '../source'; import { Queue } from '@datastructures-js/queue'; +import { assert } from '../../utils'; +import type { Framerate } from '../../compositor'; export default class MP4Source implements InputSource { private fileUrl: string; @@ -10,11 +13,13 @@ export default class MP4Source implements InputSource { private callbacks?: InputSourceCallbacks; private chunks: Queue; private eosReceived: boolean = false; + private ptsOffset?: number; + private framerate?: Framerate; public constructor(fileUrl: string) { this.fileUrl = fileUrl; this.demuxer = new MP4Demuxer({ - onConfig: config => this.callbacks?.onDecoderConfig(config), + onReady: config => this.handleOnReady(config), onPayload: payload => this.handlePayload(payload), }); this.chunks = new Queue(); @@ -42,15 +47,42 @@ export default class MP4Source implements InputSource { return this.eosReceived && this.chunks.isEmpty(); } - public nextChunk(): EncodedVideoChunk | undefined { - return this.chunks.pop(); + public getFramerate(): Framerate | undefined { + return this.framerate; + } + + public nextChunk(): VideoChunk | undefined { + const chunk = this.chunks.pop(); + return chunk && this.intoVideoChunk(chunk); + } + + public peekChunk(): VideoChunk | undefined { + const chunk = this.chunks.front(); + return chunk && this.intoVideoChunk(chunk); + } + + private handleOnReady(data: Mp4ReadyData) { + this.callbacks?.onDecoderConfig(data.decoderConfig); + this.framerate = data.framerate; } private handlePayload(payload: SourcePayload) { if (payload.type === 'chunk') { + if (!this.ptsOffset) { + this.ptsOffset = -payload.chunk.timestamp; + } this.chunks.push(payload.chunk); } else if (payload.type === 'eos') { this.eosReceived = true; } } + + private intoVideoChunk(chunk: EncodedVideoChunk): VideoChunk { + const offset = assert(this.ptsOffset); + + return { + data: chunk, + ptsMs: (offset + chunk.timestamp) / 1000, + }; + } } diff --git a/ts/@live-compositor/web-wasm/src/input/source.ts b/ts/@live-compositor/web-wasm/src/input/source.ts index 5c4ebfc5d..b3a429636 100644 --- a/ts/@live-compositor/web-wasm/src/input/source.ts +++ b/ts/@live-compositor/web-wasm/src/input/source.ts @@ -1,5 +1,11 @@ import type { RegisterInputRequest } from '@live-compositor/core'; import MP4Source from './mp4/source'; +import type { Framerate } from '../compositor'; + +export type VideoChunk = { + data: EncodedVideoChunk; + ptsMs: number; +}; export type SourcePayload = { type: 'chunk'; chunk: EncodedVideoChunk } | { type: 'eos' }; @@ -14,9 +20,13 @@ export default interface InputSource { */ start(): void; registerCallbacks(callbacks: InputSourceCallbacks): void; - // if `true` InputSource won't produce more chunks anymore + /** + * if `true` InputSource won't produce more chunks anymore + */ isFinished(): boolean; - nextChunk(): EncodedVideoChunk | undefined; + getFramerate(): Framerate | undefined; + nextChunk(): VideoChunk | undefined; + peekChunk(): VideoChunk | undefined; } export function sourceFromRequest(request: RegisterInputRequest): InputSource { diff --git a/ts/@live-compositor/web-wasm/src/queue.ts b/ts/@live-compositor/web-wasm/src/queue.ts index 7ee4324e9..391835f93 100644 --- a/ts/@live-compositor/web-wasm/src/queue.ts +++ b/ts/@live-compositor/web-wasm/src/queue.ts @@ -1,8 +1,9 @@ -import type { FrameSet, InputId, OutputId, Renderer } from '@live-compositor/browser-render'; +import type { Frame, FrameSet, InputId, OutputId, Renderer } from '@live-compositor/browser-render'; import type { Framerate } from './compositor'; import type { Input } from './input/input'; -import type { InputFrame } from './input/inputFrame'; import type { Output } from './output/output'; +import { framerateToDurationMs } from './utils'; +import type { FrameRef } from './input/frame'; export type StopQueueFn = () => void; @@ -20,7 +21,7 @@ export class Queue { } public start(): StopQueueFn { - const tickDuration = (1000 * this.framerate.den) / this.framerate.num; + const tickDuration = framerateToDurationMs(this.framerate); const queueInterval = setInterval(async () => { await this.onTick(); this.currentPts += tickDuration; @@ -61,21 +62,26 @@ export class Queue { private async onTick() { const inputs = await this.getInputFrames(); + const frames: Record = {}; + for (const inputId in inputs) { + frames[inputId] = await inputs[inputId].getFrame(); + } + const outputs = this.renderer.render({ ptsMs: this.currentPts, - frames: inputs, + frames: frames, }); this.sendOutputs(outputs); for (const input of Object.values(inputs)) { - input.free(); + input.decrementRefCount(); } } - private async getInputFrames(): Promise> { + private async getInputFrames(): Promise> { const pendingFrames = Object.entries(this.inputs).map(async ([inputId, input]) => [ inputId, - await input.getFrame(this.currentPts), + await input.getFrameRef(this.currentPts), ]); const frames = await Promise.all(pendingFrames); return Object.fromEntries(frames.filter(([_inputId, frame]) => !!frame)); diff --git a/ts/@live-compositor/web-wasm/src/utils.ts b/ts/@live-compositor/web-wasm/src/utils.ts new file mode 100644 index 000000000..b4667b026 --- /dev/null +++ b/ts/@live-compositor/web-wasm/src/utils.ts @@ -0,0 +1,13 @@ +import type { Framerate } from './compositor'; + +export function assert(value?: T): T { + if (!value) { + throw new Error('Assertion failed'); + } + + return value; +} + +export function framerateToDurationMs(framerate: Framerate): number { + return (1000 * framerate.den) / framerate.num; +} From 30717fad1d60ca2aa620e4095959c5d00b6226f5 Mon Sep 17 00:00:00 2001 From: noituri Date: Mon, 2 Dec 2024 14:56:17 +0100 Subject: [PATCH 3/4] Review suggestions --- .../web-wasm/src/input/decoder/h264Decoder.ts | 4 +- .../web-wasm/src/input/input.ts | 41 ++++++++++--------- .../web-wasm/src/input/mp4/demuxer.ts | 4 +- .../web-wasm/src/input/mp4/source.ts | 6 +-- ts/@live-compositor/web-wasm/src/queue.ts | 8 ++-- ts/@live-compositor/web-wasm/src/utils.ts | 4 +- 6 files changed, 33 insertions(+), 34 deletions(-) diff --git a/ts/@live-compositor/web-wasm/src/input/decoder/h264Decoder.ts b/ts/@live-compositor/web-wasm/src/input/decoder/h264Decoder.ts index df8a23b38..43deb6761 100644 --- a/ts/@live-compositor/web-wasm/src/input/decoder/h264Decoder.ts +++ b/ts/@live-compositor/web-wasm/src/input/decoder/h264Decoder.ts @@ -34,14 +34,14 @@ export class H264Decoder { } private intoFrameWithPts(videoFrame: VideoFrame): FrameWithPts { - if (!this.ptsOffset) { + if (this.ptsOffset === undefined) { this.ptsOffset = -videoFrame.timestamp; } return { frame: videoFrame, // TODO(noituri): Handle pts roller - ptsMs: (this.ptsOffset! + videoFrame.timestamp) / 1000, + ptsMs: (this.ptsOffset + videoFrame.timestamp) / 1000, }; } diff --git a/ts/@live-compositor/web-wasm/src/input/input.ts b/ts/@live-compositor/web-wasm/src/input/input.ts index 173b5c05d..dfe8d3417 100644 --- a/ts/@live-compositor/web-wasm/src/input/input.ts +++ b/ts/@live-compositor/web-wasm/src/input/input.ts @@ -62,7 +62,7 @@ export class Input { if (this.state !== 'playing') { return; } - if (!this.startPtsMs) { + if (this.startPtsMs === undefined) { this.startPtsMs = currentQueuePts; } @@ -109,26 +109,27 @@ export class Input { } /** - * Removes frames older than provided `currentQueuePts` + * Finds frame with PTS closest to `currentQueuePts` and removes frames older than it */ private dropOldFrames(currentQueuePts: number): void { - const targetPts = this.queuePtsToInputPts(currentQueuePts); + if (this.frames.isEmpty()) { + return; + } const frames = this.frames.toArray(); - let minPtsDiff = Number.MAX_VALUE; - let frameIndex = -1; - for (let i = 0; i < frames.length; i++) { - const framePts = frames[i].getPtsMs(); - const diff = Math.abs(framePts - targetPts); - if (diff < minPtsDiff) { - minPtsDiff = diff; - frameIndex = i; - } - } + const targetPts = this.queuePtsToInputPts(currentQueuePts); + + const targetFrame = frames.reduce((prevFrame, frame) => { + const prevPtsDiff = Math.abs(prevFrame.getPtsMs() - targetPts); + const currPtsDiff = Math.abs(frame.getPtsMs() - targetPts); + return prevPtsDiff < currPtsDiff ? prevFrame : frame + }); - for (let i = 0; i < frameIndex; i++) { - const frame = this.frames.pop(); - frame.decrementRefCount(); + for (const frame of frames) { + if (frame.getPtsMs() < targetFrame.getPtsMs()) { + frame.decrementRefCount(); + this.frames.pop(); + } } } @@ -156,8 +157,8 @@ export class Input { } private queuePtsToInputPts(queuePts: number): number { - const startTime = assert(this.startPtsMs); - return queuePts - startTime; + assert(this.startPtsMs !== undefined); + return queuePts - this.startPtsMs; } private tryEnqueueChunk() { @@ -168,7 +169,9 @@ export class Input { } private enqueueChunks(currentQueuePts: number) { - const framrate = assert(this.source.getFramerate()); + const framrate = this.source.getFramerate(); + assert(framrate); + const frameDuration = framerateToDurationMs(framrate); const targetPts = this.queuePtsToInputPts(currentQueuePts) + frameDuration * MAX_BUFFERING_SIZE; diff --git a/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts b/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts index 618b6cfe0..438696ce1 100644 --- a/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts +++ b/ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts @@ -74,7 +74,7 @@ export class MP4Demuxer { } private onSamples(samples: Sample[]) { - const samplesCount = assert(this.samplesCount); + assert(this.samplesCount !== undefined); for (const sample of samples) { const chunk = new EncodedVideoChunk({ @@ -86,7 +86,7 @@ export class MP4Demuxer { this.callbacks.onPayload({ type: 'chunk', chunk: chunk }); - if (sample.number === samplesCount - 1) { + if (sample.number === this.samplesCount - 1) { this.callbacks.onPayload({ type: 'eos' }); } } diff --git a/ts/@live-compositor/web-wasm/src/input/mp4/source.ts b/ts/@live-compositor/web-wasm/src/input/mp4/source.ts index 61a1fe1e1..9a2259808 100644 --- a/ts/@live-compositor/web-wasm/src/input/mp4/source.ts +++ b/ts/@live-compositor/web-wasm/src/input/mp4/source.ts @@ -68,7 +68,7 @@ export default class MP4Source implements InputSource { private handlePayload(payload: SourcePayload) { if (payload.type === 'chunk') { - if (!this.ptsOffset) { + if (this.ptsOffset === undefined) { this.ptsOffset = -payload.chunk.timestamp; } this.chunks.push(payload.chunk); @@ -78,11 +78,11 @@ export default class MP4Source implements InputSource { } private intoVideoChunk(chunk: EncodedVideoChunk): VideoChunk { - const offset = assert(this.ptsOffset); + assert(this.ptsOffset !== undefined); return { data: chunk, - ptsMs: (offset + chunk.timestamp) / 1000, + ptsMs: (this.ptsOffset + chunk.timestamp) / 1000, }; } } diff --git a/ts/@live-compositor/web-wasm/src/queue.ts b/ts/@live-compositor/web-wasm/src/queue.ts index 391835f93..07e3ac929 100644 --- a/ts/@live-compositor/web-wasm/src/queue.ts +++ b/ts/@live-compositor/web-wasm/src/queue.ts @@ -1,4 +1,4 @@ -import type { Frame, FrameSet, InputId, OutputId, Renderer } from '@live-compositor/browser-render'; +import type { FrameSet, InputId, OutputId, Renderer } from '@live-compositor/browser-render'; import type { Framerate } from './compositor'; import type { Input } from './input/input'; import type { Output } from './output/output'; @@ -62,10 +62,8 @@ export class Queue { private async onTick() { const inputs = await this.getInputFrames(); - const frames: Record = {}; - for (const inputId in inputs) { - frames[inputId] = await inputs[inputId].getFrame(); - } + const pendingFrames = Object.entries(inputs).map(async ([inputId, input]) => [inputId, await input.getFrame()]); + const frames = Object.fromEntries(await Promise.all(pendingFrames)); const outputs = this.renderer.render({ ptsMs: this.currentPts, diff --git a/ts/@live-compositor/web-wasm/src/utils.ts b/ts/@live-compositor/web-wasm/src/utils.ts index b4667b026..6288225ca 100644 --- a/ts/@live-compositor/web-wasm/src/utils.ts +++ b/ts/@live-compositor/web-wasm/src/utils.ts @@ -1,11 +1,9 @@ import type { Framerate } from './compositor'; -export function assert(value?: T): T { +export function assert(value: T): asserts value { if (!value) { throw new Error('Assertion failed'); } - - return value; } export function framerateToDurationMs(framerate: Framerate): number { From fd00e03d1a54415178d527d5e90385c7c67284bc Mon Sep 17 00:00:00 2001 From: noituri Date: Tue, 3 Dec 2024 12:34:29 +0100 Subject: [PATCH 4/4] Lint --- ts/@live-compositor/web-wasm/src/input/input.ts | 2 +- ts/@live-compositor/web-wasm/src/queue.ts | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/ts/@live-compositor/web-wasm/src/input/input.ts b/ts/@live-compositor/web-wasm/src/input/input.ts index dfe8d3417..0e63432c4 100644 --- a/ts/@live-compositor/web-wasm/src/input/input.ts +++ b/ts/@live-compositor/web-wasm/src/input/input.ts @@ -122,7 +122,7 @@ export class Input { const targetFrame = frames.reduce((prevFrame, frame) => { const prevPtsDiff = Math.abs(prevFrame.getPtsMs() - targetPts); const currPtsDiff = Math.abs(frame.getPtsMs() - targetPts); - return prevPtsDiff < currPtsDiff ? prevFrame : frame + return prevPtsDiff < currPtsDiff ? prevFrame : frame; }); for (const frame of frames) { diff --git a/ts/@live-compositor/web-wasm/src/queue.ts b/ts/@live-compositor/web-wasm/src/queue.ts index 07e3ac929..dc294a726 100644 --- a/ts/@live-compositor/web-wasm/src/queue.ts +++ b/ts/@live-compositor/web-wasm/src/queue.ts @@ -62,7 +62,10 @@ export class Queue { private async onTick() { const inputs = await this.getInputFrames(); - const pendingFrames = Object.entries(inputs).map(async ([inputId, input]) => [inputId, await input.getFrame()]); + const pendingFrames = Object.entries(inputs).map(async ([inputId, input]) => [ + inputId, + await input.getFrame(), + ]); const frames = Object.fromEntries(await Promise.all(pendingFrames)); const outputs = this.renderer.render({