Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
brzep committed Dec 3, 2024
1 parent d1c6d77 commit cb84214
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 45 deletions.
11 changes: 9 additions & 2 deletions compositor_pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,18 @@ pub struct Options {
pub force_gpu: bool,
pub download_root: PathBuf,
pub output_sample_rate: u32,
pub stun_servers: Arc<Vec<String>>,
pub wgpu_features: WgpuFeatures,
pub load_system_fonts: Option<bool>,
pub wgpu_ctx: Option<GraphicsContext>,
pub tokio_rt: Arc<Runtime>,
pub tokio_rt: Option<Arc<Runtime>>,
}

#[derive(Clone)]
pub struct PipelineCtx {
pub output_sample_rate: u32,
pub output_framerate: Framerate,
pub stun_servers: Arc<Vec<String>>,
pub download_dir: Arc<PathBuf>,
pub event_emitter: Arc<EventEmitter>,
pub tokio_rt: Arc<Runtime>,
Expand Down Expand Up @@ -185,6 +187,10 @@ impl Pipeline {
.join(format!("live-compositor-{}", rand::random::<u64>()));
std::fs::create_dir_all(&download_dir).map_err(InitPipelineError::CreateDownloadDir)?;

let tokio_rt = match opts.tokio_rt {
Some(tokio_rt) => tokio_rt,
None => Arc::new(Runtime::new().map_err(InitPipelineError::CreateTokioRuntime)?),
};
let event_emitter = Arc::new(EventEmitter::new());
let pipeline = Pipeline {
outputs: HashMap::new(),
Expand All @@ -196,9 +202,10 @@ impl Pipeline {
ctx: PipelineCtx {
output_sample_rate: opts.output_sample_rate,
output_framerate: opts.queue_options.output_framerate,
stun_servers: opts.stun_servers,
download_dir: download_dir.into(),
event_emitter,
tokio_rt: opts.tokio_rt,
tokio_rt,
#[cfg(feature = "vk-video")]
vulkan_ctx: preinitialized_ctx.and_then(|ctx| ctx.vulkan_ctx),
},
Expand Down
27 changes: 14 additions & 13 deletions compositor_pipeline/src/pipeline/output/whip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ pub struct WhipCtx {
output_id: OutputId,
options: WhipSenderOptions,
sample_rate: u32,
stun_servers: Arc<Vec<String>>,
request_keyframe_sender: Option<Sender<()>>,
should_close: Arc<AtomicBool>,
tokio_rt: Arc<Runtime>,
event_emitter: Arc<EventEmitter>,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -139,23 +141,21 @@ impl WhipSender {
output_id: output_id.clone(),
options: options.clone(),
sample_rate: pipeline_ctx.output_sample_rate,
stun_servers: pipeline_ctx.stun_servers.clone(),
request_keyframe_sender,
should_close: should_close.clone(),
tokio_rt: pipeline_ctx.tokio_rt.clone(),
event_emitter: pipeline_ctx.event_emitter.clone(),
};

pipeline_ctx.tokio_rt.spawn(
run_whip_sender_task(
whip_ctx,
packet_stream,
init_confirmation_sender,
pipeline_ctx.event_emitter.clone(),
)
.instrument(span!(
Level::INFO,
"WHIP sender",
output_id = output_id.to_string()
)),
run_whip_sender_task(whip_ctx, packet_stream, init_confirmation_sender).instrument(
span!(
Level::INFO,
"WHIP sender",
output_id = output_id.to_string()
),
),
);

let start_time = Instant::now();
Expand Down Expand Up @@ -197,7 +197,6 @@ async fn run_whip_sender_task(
whip_ctx: WhipCtx,
packet_stream: PacketStream,
init_confirmation_sender: oneshot::Sender<Result<(), WhipError>>,
event_emitter: Arc<EventEmitter>,
) {
let client = Arc::new(reqwest::Client::new());
let (peer_connection, video_track, audio_track) = match init_peer_connection(&whip_ctx).await {
Expand Down Expand Up @@ -274,6 +273,8 @@ async fn run_whip_sender_task(
if let Err(err) = client.delete(whip_session_url).send().await {
error!("Error while sending delete whip session request: {}", err);
}
event_emitter.emit(Event::OutputDone(output_id_clone));
whip_ctx
.event_emitter
.emit(Event::OutputDone(output_id_clone));
debug!("Closing WHIP sender thread.")
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ use crate::{
};

use super::{WhipAudioOptions, WhipCtx, WhipError};
use std::{
env::{self, VarError},
sync::Arc,
};
use tracing::{info, warn};
use std::sync::Arc;
use webrtc::{
api::{
interceptor_registry::register_default_interceptors,
Expand All @@ -25,8 +21,6 @@ use webrtc::{
track::track_local::track_local_static_rtp::TrackLocalStaticRTP,
};

const STUN_SERVER_ENV: &str = "LIVE_COMPOSITOR_STUN_SERVERS";

pub async fn init_peer_connection(
whip_ctx: &WhipCtx,
) -> Result<
Expand Down Expand Up @@ -57,27 +51,9 @@ pub async fn init_peer_connection(
.with_interceptor_registry(registry)
.build();

let mut stun_servers_urls = vec!["stun:stun.l.google.com:19302".to_owned()];

match env::var(STUN_SERVER_ENV) {
Ok(var) => {
if var.is_empty() {
info!("Empty LIVE_COMPOSITOR_STUN_SERVERS environment variable, using default");
} else {
let env_url_list: Vec<String> = var.split(',').map(String::from).collect();
stun_servers_urls.extend(env_url_list);
info!("Using custom stun servers defined in LIVE_COMPOSITOR_STUN_SERVERS environment variable");
}
}
Err(err) => match err {
VarError::NotPresent => info!("No stun servers provided, using default"),
VarError::NotUnicode(_) => warn!("Invalid LIVE_COMPOSITOR_STUN_SERVERS environment variable, it is not a valid Unicode, using default")
},
}

let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: stun_servers_urls,
urls: whip_ctx.stun_servers.to_vec(),
..Default::default()
}],
..Default::default()
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/examples/raw_channel_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ fn main() {
force_gpu: config.force_gpu,
download_root: config.download_root,
output_sample_rate: config.output_sample_rate,
stun_servers: config.stun_servers,
wgpu_features: config.required_wgpu_features,
load_system_fonts: Some(true),
wgpu_ctx: Some(ctx),
tokio_rt: Arc::new(Runtime::new().unwrap()),
tokio_rt: Some(Arc::new(Runtime::new().unwrap())),
})
.unwrap_or_else(|err| {
panic!(
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/examples/raw_channel_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ fn main() {
force_gpu: config.force_gpu,
download_root: config.download_root,
output_sample_rate: config.output_sample_rate,
stun_servers: config.stun_servers,
wgpu_features: config.required_wgpu_features,
load_system_fonts: Some(true),
wgpu_ctx: Some(ctx),
tokio_rt: Arc::new(Runtime::new().unwrap()),
tokio_rt: Some(Arc::new(Runtime::new().unwrap())),
})
.unwrap_or_else(|err| {
panic!(
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/examples/whip_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn client_code() -> Result<()> {
&json!({
"type": "whip",
"endpoint_url": "https://g.webrtc.live-video.net:4443/v2/offer",
"bearer_token": "", // your Bearer token
"bearer_token": "live_415462268_xSFWGJSSxsl5YUOueD0NXaO5aailcs", // your Bearer token
"video": {
"resolution": {
"width": VIDEO_RESOLUTION.width,
Expand Down
17 changes: 17 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
use compositor_pipeline::queue::{self, QueueOptions};
use compositor_render::{web_renderer::WebRendererInitOptions, Framerate, WgpuFeatures};
use rand::Rng;
use tracing::error;

use crate::logger::FfmpegLogLevel;

Expand All @@ -23,6 +24,7 @@ pub struct Config {
pub download_root: PathBuf,
pub queue_options: QueueOptions,
pub output_sample_rate: u32,
pub stun_servers: Arc<Vec<String>>,
pub required_wgpu_features: WgpuFeatures,
}

Expand Down Expand Up @@ -194,6 +196,20 @@ fn try_read_config() -> Result<Config, String> {
Err(_) => None,
};

let default_stun_servers = Arc::new(vec!["stun:stun.l.google.com:19302".to_string()]);

let stun_servers = match env::var("LIVE_COMPOSITOR_STUN_SERVERS") {
Ok(var) => {
if var.is_empty() {
error!("empty stun servers env");
Arc::new(Vec::new())
} else {
Arc::new(var.split(',').map(String::from).collect())
}
}
Err(_) => default_stun_servers,
};

let config = Config {
instance_id,
api_port,
Expand All @@ -218,6 +234,7 @@ fn try_read_config() -> Result<Config, String> {
},
download_root,
output_sample_rate,
stun_servers,
required_wgpu_features,
};
Ok(config)
Expand Down
4 changes: 3 additions & 1 deletion src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl ApiState {
force_gpu,
download_root,
output_sample_rate,
stun_servers,
required_wgpu_features,
..
} = config.clone();
Expand All @@ -52,10 +53,11 @@ impl ApiState {
force_gpu,
download_root,
output_sample_rate,
stun_servers,
wgpu_features: required_wgpu_features,
wgpu_ctx: None,
load_system_fonts: Some(true),
tokio_rt: runtime,
tokio_rt: Some(runtime),
})?;
Ok((
ApiState {
Expand Down

0 comments on commit cb84214

Please sign in to comment.