Skip to content

Commit

Permalink
fix RTMP publish single AAC from ffmpeg client.
Browse files Browse the repository at this point in the history
  • Loading branch information
suzp1984 committed Apr 12, 2024
1 parent d3a9bbf commit e74c496
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 17 deletions.
19 changes: 13 additions & 6 deletions library/bytesio/src/bytesio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,25 @@ pub struct UdpIO {

impl UdpIO {
pub async fn new(remote_domain: String, remote_port: u16, local_port: u16) -> Option<Self> {
let remote_address = format!("{remote_domain}:{remote_port}");
let remote_address = if remote_domain == "localhost" {
format!("127.0.0.1:{remote_port}")
} else {
format!("{remote_domain}:{remote_port}")
};
log::info!("remote address: {}", remote_address);
let local_address = format!("0.0.0.0:{local_port}");
if let Ok(local_socket) = UdpSocket::bind(local_address).await {
if let Ok(remote_socket_addr) = remote_address.parse::<SocketAddr>() {
if let Err(err) = local_socket.connect(remote_socket_addr).await {
log::info!("connect to remote udp socket error: {}", err);
}

return Some(Self {
socket: local_socket,
});
} else {
log::error!("remote_address parse error: {:?}", remote_address);
}
return Some(Self {
socket: local_socket,
});
}

None
Expand Down Expand Up @@ -75,7 +82,7 @@ impl TNetIO for UdpIO {
Ok(data) => data,
Err(err) => Err(BytesIOError {
value: BytesIOErrorValue::TimeoutError(err),
})
}),
}
}

Expand Down Expand Up @@ -120,7 +127,7 @@ impl TNetIO for TcpIO {
Ok(data) => data,
Err(err) => Err(BytesIOError {
value: BytesIOErrorValue::TimeoutError(err),
})
}),
}
}

Expand Down
5 changes: 4 additions & 1 deletion library/container/flv/src/demuxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ impl FlvAudioTagDemuxer {
if tag_header.sound_format == SoundFormat::AAC as u8 {
match tag_header.aac_packet_type {
aac_packet_type::AAC_SEQHDR => {
self.aac_processor.audio_specific_config_load()?;
if self.aac_processor.bytes_reader.len() >= 2 {
self.aac_processor.audio_specific_config_load()?;
}

return Ok(FlvDemuxerAudioData::new());
}
aac_packet_type::AAC_RAW => {
Expand Down
42 changes: 39 additions & 3 deletions library/container/flv/src/muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,38 @@ use {
bytesio::bytes_writer::BytesWriter,
};

const FLV_HEADER: [u8; 9] = [
const FLV_HEADER_AV: [u8; 9] = [
0x46, // 'F'
0x4c, //'L'
0x56, //'V'
0x01, //version
0x05, //00000101 audio tag and video tag
0x00, 0x00, 0x00, 0x09, //flv header size
]; // 9
const FLV_HEADER_JUST_AUDIO: [u8; 9] = [
0x46, // 'F'
0x4c, //'L'
0x56, //'V'
0x01, //version
0x04, //00000101 audio tag and video tag
0x00, 0x00, 0x00, 0x09, //flv header size
]; // 9
const FLV_HEADER_JUST_VIDEO: [u8; 9] = [
0x46, // 'F'
0x4c, //'L'
0x56, //'V'
0x01, //version
0x01, //00000101 audio tag and video tag
0x00, 0x00, 0x00, 0x09, //flv header size
]; // 9
const FLV_HEADER_EMPTY_AV: [u8; 9] = [
0x46, // 'F'
0x4c, //'L'
0x56, //'V'
0x01, //version
0x00, //00000101 audio tag and video tag
0x00, 0x00, 0x00, 0x09, //flv header size
]; // 9
pub const HEADER_LENGTH: u32 = 11;
pub struct FlvMuxer {
pub writer: BytesWriter,
Expand All @@ -29,8 +53,20 @@ impl FlvMuxer {
}
}

pub fn write_flv_header(&mut self) -> Result<(), FlvMuxerError> {
self.writer.write(&FLV_HEADER)?;
pub fn write_flv_header(
&mut self,
has_audio: bool,
has_video: bool,
) -> Result<(), FlvMuxerError> {
if has_audio && has_video {
self.writer.write(&FLV_HEADER_AV)?;
} else if has_audio {
self.writer.write(&FLV_HEADER_JUST_AUDIO)?;
} else if has_video {
self.writer.write(&FLV_HEADER_JUST_VIDEO)?;
} else {
self.writer.write(&FLV_HEADER_EMPTY_AV)?;
}
Ok(())
}

Expand Down
4 changes: 4 additions & 0 deletions protocol/hls/src/flv2hls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ impl Flv2HlsRemuxer {
dts = data.dts;
pid = self.audio_pid;
payload.extend_from_slice(&data.data[..]);

if dts - self.last_ts_dts >= self.duration * 1000 {
self.need_new_segment = true;
}
}
_ => return Ok(()),
}
Expand Down
59 changes: 55 additions & 4 deletions protocol/httpflv/src/httpflv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ use {
xflv::muxer::{FlvMuxer, HEADER_LENGTH},
};

use rtmp::{chunk::define::csid_type::AUDIO, remuxer::rtmp_cooker::RtmpCooker};

pub struct HttpFlv {
app_name: String,
stream_name: String,

muxer: FlvMuxer,

has_audio: bool,
has_video: bool,
has_send_header: bool,

event_producer: StreamHubEventSender,
data_receiver: FrameDataReceiver,
/* now used for subscriber session */
Expand All @@ -52,6 +58,9 @@ impl HttpFlv {
app_name,
stream_name,
muxer: FlvMuxer::new(),
has_audio: false,
has_video: false,
has_send_header: false,
data_receiver,
statistic_data_sender: None,
event_producer,
Expand All @@ -70,14 +79,56 @@ impl HttpFlv {
}

pub async fn send_media_stream(&mut self) -> Result<(), HttpFLvError> {
self.muxer.write_flv_header()?;
self.muxer.write_previous_tag_size(0)?;

self.flush_response_data()?;
let mut retry_count = 0;

let mut max_av_frame_num_to_guess_av = 0;
// the first av frames are sequence configs, must be cached;
let mut cached_frames = Vec::new();
//write flv body
loop {
if let Some(data) = self.data_receiver.recv().await {
if !self.has_send_header {
max_av_frame_num_to_guess_av += 1;

match data {
FrameData::Audio {
timestamp: _,
data: _,
} => {
self.has_audio = true;
cached_frames.push(data);
}
FrameData::Video {
timestamp: _,
data: _,
} => {
self.has_video = true;
cached_frames.push(data);
}
FrameData::MetaData {
timestamp: _,
data: _,
} => cached_frames.push(data),
_ => {}
}

if (self.has_audio && self.has_video) || max_av_frame_num_to_guess_av > 10 {
self.has_send_header = true;
self.muxer
.write_flv_header(self.has_audio, self.has_video)?;
self.muxer.write_previous_tag_size(0)?;

self.flush_response_data()?;

for frame in &cached_frames {
self.write_flv_tag(frame.clone())?;
}
cached_frames.clear();
}

continue;
}

if let Err(err) = self.write_flv_tag(data) {
if let HttpFLvErrorValue::MpscSendError(err_in) = &err.value {
if err_in.is_disconnected() {
Expand Down
8 changes: 5 additions & 3 deletions protocol/rtmp/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub struct Cache {

impl Cache {
pub fn new(gop_num: usize, statistic_data_sender: Option<StatisticDataSender>) -> Self {

Cache {
metadata: metadata::MetaData::new(),
metadata_timestamp: 0,
Expand Down Expand Up @@ -78,7 +77,10 @@ impl Cache {
let mut reader = BytesReader::new(chunk_body.clone());
let tag_header = AudioTagHeader::unmarshal(&mut reader)?;

if tag_header.sound_format == define::SoundFormat::AAC as u8
let remain_bytes = reader.extract_remaining_bytes();

if remain_bytes.len() >= 2
&& tag_header.sound_format == define::SoundFormat::AAC as u8
&& tag_header.aac_packet_type == define::aac_packet_type::AAC_SEQHDR
{
self.audio_seq = chunk_body.clone();
Expand All @@ -88,7 +90,7 @@ impl Cache {
let mut aac_processor = Mpeg4AacProcessor::default();

let aac = aac_processor
.extend_data(reader.extract_remaining_bytes())
.extend_data(remain_bytes)
.audio_specific_config_load()?;

let statistic_audio_codec = StatisticData::AudioCodec {
Expand Down

0 comments on commit e74c496

Please sign in to comment.