Skip to content

Commit

Permalink
feat: 支持bilibili 直播推送
Browse files Browse the repository at this point in the history
  • Loading branch information
zuiyu1998 committed Mar 28, 2024
1 parent 62be8df commit 991ad55
Show file tree
Hide file tree
Showing 7 changed files with 681 additions and 16 deletions.
5 changes: 5 additions & 0 deletions protocol/rtmp/src/netconnection/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,17 @@ impl NetConnection {
pub async fn write_create_stream(
&mut self,
transaction_id: &f64,
search: Option<String>,
) -> Result<(), NetConnectionError> {
self.amf0_writer
.write_string(&String::from("createStream"))?;
self.amf0_writer.write_number(transaction_id)?;
self.amf0_writer.write_null()?;

if let Some(ref search) = search {
self.amf0_writer.write_string(search)?;
}

self.write_chunk().await
}

Expand Down
23 changes: 20 additions & 3 deletions protocol/rtmp/src/netstream/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,18 @@ impl NetStreamWriter {
&mut self,
transaction_id: &f64,
stream_name: &String,
search: Option<String>,
) -> Result<(), NetStreamError> {
self.amf0_writer
.write_string(&String::from("releaseStream"))?;
self.amf0_writer.write_number(transaction_id)?;
self.amf0_writer.write_null()?;
self.amf0_writer.write_string(stream_name)?;

if let Some(search) = search {
self.amf0_writer.write_string(&search)?;
} else {
self.amf0_writer.write_string(stream_name)?;
}

self.write_chunk(0).await
}
Expand All @@ -103,11 +109,17 @@ impl NetStreamWriter {
&mut self,
transaction_id: &f64,
stream_name: &String,
search: Option<String>,
) -> Result<(), NetStreamError> {
self.amf0_writer.write_string(&String::from("FCPublish"))?;
self.amf0_writer.write_number(transaction_id)?;
self.amf0_writer.write_null()?;
self.amf0_writer.write_string(stream_name)?;

if let Some(search) = search {
self.amf0_writer.write_string(&search)?;
} else {
self.amf0_writer.write_string(stream_name)?;
}

self.write_chunk(0).await
}
Expand Down Expand Up @@ -145,12 +157,17 @@ impl NetStreamWriter {
transaction_id: &f64,
stream_name: &String,
stream_type: &String,
search: Option<String>,
) -> Result<(), NetStreamError> {
self.amf0_writer.write_string(&String::from("publish"))?;
self.amf0_writer.write_number(transaction_id)?;
self.amf0_writer.write_null()?;
self.amf0_writer.write_string(stream_name)?;

if let Some(search) = search {
self.amf0_writer.write_string(&search)?;
}
self.amf0_writer.write_string(stream_type)?;
self.amf0_writer.write_string(stream_name)?;

self.write_chunk(0).await
}
Expand Down
23 changes: 17 additions & 6 deletions protocol/rtmp/src/relay/bilibili_push_client.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
use {
super::errors::ClientError,
crate::{
session::client_session::{ClientSession, ClientType},
utils::RtmpUrlParser,
},
crate::{session::bilibili_client_session::BiliBiliClientSession, utils::RtmpUrlParser},
streamhub::{
define::{BroadcastEvent, BroadcastEventReceiver, StreamHubEventSender},
stream::StreamIdentifier,
},
tokio::net::TcpStream,
};

//c -> s SetChunkSize
//c -> s connect
//s -> c window
//s -> c setPeer
//c -> s amf0 command releaseStream
//c -> s amf0 command FCPublish
//s -> c _result
//c -> s amf0 command publish

pub struct BiliBiliPushClient {
url: String,
client_event_consumer: BroadcastEventReceiver,
Expand Down Expand Up @@ -58,14 +64,19 @@ impl BiliBiliPushClient {
);
let stream = TcpStream::connect(address.clone()).await?;

let mut client_session = ClientSession::new(
let search = rtmp
.query
.as_ref()
.and_then(|query| Some(format!("?{}", query)));

let mut client_session = BiliBiliClientSession::new(
stream,
ClientType::Publish,
address.clone(),
app_name,
stream_name,
self.channel_event_producer.clone(),
0,
search,
);

tokio::spawn(async move {
Expand Down
Loading

0 comments on commit 991ad55

Please sign in to comment.