From fd0367ff18d23a56e60792b2dea4c915fa1e6146 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Fri, 11 Jun 2021 09:22:20 -0700 Subject: [PATCH] push parser helper to accumulate NALs for dholroyd/h264-reader#4 --- src/lib.rs | 1 + src/nal/mod.rs | 26 +++- src/push/mod.rs | 311 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 337 insertions(+), 1 deletion(-) create mode 100644 src/push/mod.rs diff --git a/src/lib.rs b/src/lib.rs index 3c5f814..0a237db 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ pub mod rbsp; pub mod annexb; pub mod nal; pub mod avcc; +pub mod push; /// Contextual data that needs to be tracked between evaluations of different portions of H264 /// syntax. diff --git a/src/nal/mod.rs b/src/nal/mod.rs index d4a325e..385a0fc 100644 --- a/src/nal/mod.rs +++ b/src/nal/mod.rs @@ -110,11 +110,12 @@ pub enum UnitTypeError { ValueOutOfRange(u8) } -#[derive(Copy,Clone)] +#[derive(Copy,Clone,PartialEq,Eq)] pub struct NalHeader ( u8 ); #[derive(Debug)] pub enum NalHeaderError { + /// The most significant bit of the header, called `forbidden_zero_bit`, was set to 1. ForbiddenZeroBit, } impl NalHeader { @@ -149,6 +150,29 @@ impl fmt::Debug for NalHeader { } } +/// A partially- or completely-buffered encoded NAL. Must have at least one byte (the header). +pub trait Nal { + type BufRead: std::io::BufRead; + + /// Returns whether the NAL is completely buffered. + fn complete(&self) -> bool; + + /// Returns the NAL header or error if corrupt. + fn header(&self) -> Result; + + /// Reads the bytes in NAL form (including the header byte and + /// any emulation-prevention-three-bytes) as a [`std::io::BufRead`]. + /// If the NAL is incomplete, reads may fail with `ErrorKind::WouldBlock`. + fn reader(&self) -> Self::BufRead; + + // /// Reads the bytes in RBSP form (skipping header byte and + // /// emulation-prevention-three-bytes). + // fn rbsp_bytes(&self) -> rbsp::ByteReader { ... } + + // /// Reads bits within the RBSP form. + // fn rbsp_bits(&self) -> rbsp::BitReader> { ... } +} + #[derive(Debug)] enum NalSwitchState { Start, diff --git a/src/push/mod.rs b/src/push/mod.rs new file mode 100644 index 0000000..68a3986 --- /dev/null +++ b/src/push/mod.rs @@ -0,0 +1,311 @@ +//! Push parsing of encoded NALs. + +use crate::nal::{NalHeader, NalHeaderError}; + +use super::nal::Nal; + +/// [`AccumulatedNalHandler`]'s interest in receiving additional callbacks on a NAL. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum NalInterest { + /// If this NAL is incomplete, buffer it and call again later. + /// No effect if the NAL is complete. + Buffer, + + /// Don't buffer any more of this NAL or make any more calls on it. + Ignore, +} + +/// Handles a partially- or fully-buffered NAL, returning whether additional calls are desired. +pub type AccumulatedNalHandler<'a> = dyn FnMut(AccumulatedNal<'_>) -> NalInterest + 'a; + +/// NAL accumulator for push parsers. +/// +/// This is meant to be used by parsers for a specific format: Annex B, AVC, MPEG-TS, RTP, etc. +/// Accumulates NALs in an internal buffer and delegates to a caller-supplied handler. +pub struct NalAccumulator<'a> { + buf: Vec, + nal_handler: &'a mut AccumulatedNalHandler<'a>, + interest: NalInterest, +} +impl<'a> NalAccumulator<'a> { + /// Creates a new accumulator which delegates to the given `nal_handler` on every push. + /// `nal_handler` always the NAL from the beginning. + pub fn new(nal_handler: &'a mut AccumulatedNalHandler<'a>) -> Self { + Self { + buf: Vec::new(), + interest: NalInterest::Buffer, + nal_handler, + } + } + + /// Pushes a portion of a NAL. + /// + /// This calls `nal_handler` unless any of the following are true: + /// * a previous call on the same NAL returned [`NalInterest::Ignore`]. + /// * the NAL is totally empty. + /// * the push adds no bytes and end is false. + pub fn push(&mut self, buf: &[u8], end: bool) { + if self.interest != NalInterest::Ignore + && (!buf.is_empty() || (!self.buf.is_empty() && end)) { + // Call the NAL handler. Avoid copying unless necessary. + match (self.nal_handler)(AccumulatedNal::new([&*self.buf, buf], end)) { + NalInterest::Buffer if !end => self.buf.extend_from_slice(buf), + NalInterest::Ignore if !end => self.interest = NalInterest::Ignore, + _ => {}, + } + } + if end { + self.buf.clear(); + self.interest = NalInterest::Buffer; + } + } +} +impl<'a> std::fmt::Debug for NalAccumulator<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NalAccumulator") + .field("interest", &self.interest) + .field("buf", &self.buf) + .field("header", &self.buf.first().map(|&h| NalHeader::new(h))) + .finish() + } +} + +/// A partially- or completely-buffered [`Nal`] passed from [`NalAccumulator`] to an +/// [`AccumulatedNalHandler`]. +#[derive(Clone, Eq, PartialEq)] +pub struct AccumulatedNal<'a> { + header: u8, + complete: bool, + + // Non-empty chunks. + first_chunk: &'a [u8], + second_chunk: Option<&'a [u8]>, +} +impl<'a> AccumulatedNal<'a> { + /// The caller must ensure there is at least byte in one of the chunks. + fn new(chunks: [&'a [u8]; 2], complete: bool) -> Self { + if let Some(&header) = chunks[0].first() { + Self { + header, + first_chunk: chunks[0], + second_chunk: if chunks[1].is_empty() { None } else { Some(chunks[1]) }, + complete, + } + } else { + Self { + header: *chunks[1].first().expect("AccumulatedNal must be non-empty"), + first_chunk: chunks[1], + second_chunk: None, + complete, + } + } + } +} +impl<'a> Nal for AccumulatedNal<'a> { + type BufRead = AccumulatedNalReader<'a>; + + #[inline] + fn complete(&self) -> bool { + self.complete + } + + #[inline] + fn header(&self) -> Result { + NalHeader::new(self.header) + } + + #[inline] + fn reader(&self) -> Self::BufRead { + AccumulatedNalReader { + cur: self.first_chunk, + next: self.second_chunk, + complete: self.complete, + } + } +} +impl<'a> std::fmt::Debug for AccumulatedNal<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AccumulatedNal") + .field("header", &self.header()) + + // TODO: it'd be nicer to display these as a single hex string. + .field("first_chunk", &self.first_chunk) + .field("second_chunk", &self.second_chunk) + .field("complete", &self.complete) + .finish() + } +} + +/// A reader through the bytes of a partially- or fully-buffered [`AccumulatedNal`] +/// that implements [`std::io::BufRead`]. +/// +/// Returns `ErrorKind::WouldBlock` on reaching the end of partially-buffered NAL. +/// Construct via `Nal::reader`. +pub struct AccumulatedNalReader<'a> { + /// Empty only if at end. + cur: &'a [u8], + next: Option<&'a [u8]>, + complete: bool, +} +impl<'a> std::io::Read for AccumulatedNalReader<'a> { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let len; + if buf.is_empty() { + len = 0; + } else if self.cur.is_empty() && !self.complete { + return Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, + "reached end of partially-buffered NAL")); + } else if buf.len() < self.cur.len() { + len = buf.len(); + let (copy, keep) = self.cur.split_at(len); + buf.copy_from_slice(copy); + self.cur = keep; + } else { + len = self.cur.len(); + buf[..len].copy_from_slice(self.cur); + self.cur = self.next.take().unwrap_or(&[]); + } + Ok(len) + } +} +impl<'a> std::io::BufRead for AccumulatedNalReader<'a> { + fn fill_buf(&mut self) -> std::io::Result<&[u8]> { + if self.cur.is_empty() && !self.complete { + return Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, + "reached end of partially-buffered NAL")); + } + Ok(self.cur) + } + fn consume(&mut self, amt: usize) { + self.cur = &self.cur[amt..]; + if self.cur.is_empty() { + self.cur = self.next.take().unwrap_or(&[]); + } + } +} + +#[cfg(test)] +mod test { + use std::io::{BufRead, Read}; + + use super::*; + + /// Tests `AccumulatedNal` in isolation. + #[test] + fn nal() { + fn common<'a>(first_chunk: &'a [u8], second_chunk: &'a [u8], complete: bool) -> AccumulatedNal<'a> { + let nal = AccumulatedNal::new([first_chunk, second_chunk], complete); + assert_eq!(NalHeader::new(0b0101_0001).unwrap(), nal.header().unwrap()); + + // Try the Read impl. + let mut r = nal.reader(); + let mut buf = [0u8; 5]; + r.read_exact(&mut buf).unwrap(); + assert_eq!(&buf[..], &[0b0101_0001, 1, 2, 3, 4]); + if complete { + assert_eq!(r.read(&mut buf[..]).unwrap(), 0); + + // Also try read_to_end. + let mut buf = Vec::new(); + nal.reader().read_to_end(&mut buf).unwrap(); + assert_eq!(buf, &[0b0101_0001, 1, 2, 3, 4]); + } else { + assert_eq!(r.read(&mut buf[..]).unwrap_err().kind(), std::io::ErrorKind::WouldBlock); + } + + // Let the caller try the BufRead impl. + nal + } + + // Incomplete NAL with a first chunk only. + let nal = common(&[0b0101_0001, 1, 2, 3, 4], &[], false); + let mut r = nal.reader(); + assert_eq!(r.fill_buf().unwrap(), &[0b0101_0001, 1, 2, 3, 4]); + r.consume(1); + assert_eq!(r.fill_buf().unwrap(), &[1, 2, 3, 4]); + r.consume(4); + assert_eq!(r.fill_buf().unwrap_err().kind(), std::io::ErrorKind::WouldBlock); + + // Incomplete NAL with a second chunk only. + let nal = common(&[], &[0b0101_0001, 1, 2, 3, 4], false); + let mut r = nal.reader(); + assert_eq!(r.fill_buf().unwrap(), &[0b0101_0001, 1, 2, 3, 4]); + r.consume(1); + assert_eq!(r.fill_buf().unwrap(), &[1, 2, 3, 4]); + r.consume(4); + assert_eq!(r.fill_buf().unwrap_err().kind(), std::io::ErrorKind::WouldBlock); + + // Incomplete NAL with both chunks. + let nal = common(&[0b0101_0001], &[1, 2, 3, 4], false); + let mut r = nal.reader(); + assert_eq!(r.fill_buf().unwrap(), &[0b0101_0001]); + r.consume(1); + assert_eq!(r.fill_buf().unwrap(), &[1, 2, 3, 4]); + r.consume(3); + assert_eq!(r.fill_buf().unwrap(), &[4]); + r.consume(1); + assert_eq!(r.fill_buf().unwrap_err().kind(), std::io::ErrorKind::WouldBlock); + + // Complete NAL with first chunk only. + let nal = common(&[0b0101_0001, 1, 2, 3, 4], &[], true); + let mut r = nal.reader(); + assert_eq!(r.fill_buf().unwrap(), &[0b0101_0001, 1, 2, 3, 4]); + r.consume(1); + assert_eq!(r.fill_buf().unwrap(), &[1, 2, 3, 4]); + r.consume(4); + assert_eq!(r.fill_buf().unwrap(), &[]); + } + + #[test] + fn accumulate() { + // Try buffering everything. + let mut nals = Vec::new(); + let mut handler = |nal: AccumulatedNal<'_>| { + if nal.complete() { + let mut buf = Vec::new(); + nal.reader().read_to_end(&mut buf).unwrap(); + nals.push(buf); + } + NalInterest::Buffer + }; + let mut accumulator = NalAccumulator::new(&mut handler); + accumulator.push(&[], false); + accumulator.push(&[], true); + accumulator.push(&[0b0101_0001, 1], true); + accumulator.push(&[0b0101_0001], false); + accumulator.push(&[], false); + accumulator.push(&[2], true); + accumulator.push(&[0b0101_0001], false); + accumulator.push(&[], false); + accumulator.push(&[3], false); + accumulator.push(&[], true); + assert_eq!(nals, &[ + &[0b0101_0001, 1][..], + &[0b0101_0001, 2][..], + &[0b0101_0001, 3][..], + ]); + + // Try buffering nothing and see what's given on the first push. + nals.clear(); + let mut handler = |nal: AccumulatedNal<'_>| { + nals.push(nal.reader().fill_buf().unwrap().to_owned()); + NalInterest::Ignore + }; + let mut accumulator = NalAccumulator::new(&mut handler); + accumulator.push(&[], false); + accumulator.push(&[], true); + accumulator.push(&[0b0101_0001, 1], true); + accumulator.push(&[0b0101_0001], false); + accumulator.push(&[], false); + accumulator.push(&[2], true); + accumulator.push(&[0b0101_0001], false); + accumulator.push(&[], false); + accumulator.push(&[3], false); + accumulator.push(&[], true); + assert_eq!(nals, &[ + &[0b0101_0001, 1][..], + &[0b0101_0001][..], + &[0b0101_0001][..], + ]); + } +}