diff --git a/bin/input/Cargo.toml b/bin/input/Cargo.toml index 444137ce..1d062808 100644 --- a/bin/input/Cargo.toml +++ b/bin/input/Cargo.toml @@ -5,15 +5,17 @@ edition = "2021" [dependencies] anyhow = "1.0.82" +chrono = "0.4.38" clap = "4.5.4" enigo = "0.2.1" env_logger = "0.11.3" log = "0.4.21" moq-native = { git = "https://github.com/kixelated/moq-rs", version = "0.1.0" } moq-transport = { git = "https://github.com/kixelated/moq-rs", version = "0.5.0" } +rand = "0.8.5" serde = { version="1.0.202" , features = ["derive"]} serde_json = "1.0.117" tokio = "1.37.0" tracing = "0.1.40" tracing-subscriber = "0.3.18" -url = "2.5.0" +url = "2.5.0" \ No newline at end of file diff --git a/bin/input/src/cli.rs b/bin/input/src/cli.rs index e19e099d..1cd5cb43 100644 --- a/bin/input/src/cli.rs +++ b/bin/input/src/cli.rs @@ -5,7 +5,7 @@ use url::Url; #[derive(Parser, Clone, Debug)] pub struct Config { /// Listen for UDP packets on the given address. - #[arg(long, default_value = "[::]:0")] + #[arg(long, default_value = "[::]:8080")] pub bind: net::SocketAddr, /// Connect to the given URL starting with https:// diff --git a/bin/input/src/input.rs b/bin/input/src/input.rs index c7cd62ea..d1a5e09d 100644 --- a/bin/input/src/input.rs +++ b/bin/input/src/input.rs @@ -1,15 +1,17 @@ use anyhow::Context; use enigo::{ Axis::Horizontal, - Coordinate::Abs, + Coordinate::Rel, Direction::{Press, Release}, Enigo, Keyboard, Mouse, Settings, }; -use moq_transport::serve::{ - DatagramsReader, GroupsReader, ObjectsReader, StreamReader, TrackReader, TrackReaderMode, -}; -use serde::{Deserialize, Serialize}; +use moq_transport:: + serve::{ + DatagramsReader, GroupsReader, ObjectsReader, + StreamReader, TrackReader, TrackReaderMode, + }; +use serde::{Deserialize, Serialize}; pub struct Subscriber { track: TrackReader, } @@ -28,8 +30,54 @@ impl Subscriber { Self { track } } + // pub async fn run(self) -> anyhow::Result<()> { + // loop { + // match self.track.mode().await { + // Ok(mode) => match mode { + // TrackReaderMode::Stream(stream) => loop { + // if let Err(err) = Self::recv_stream(stream.clone()).await { + // tracing::warn!("Error receiving streams: {}, retrying...", err); + // tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // } else { + // break; + // } + // }, + // TrackReaderMode::Groups(groups) => loop { + // if let Err(err) = Self::recv_groups(groups.clone()).await { + // tracing::warn!("Error receiving groups: {}, retrying...", err); + // tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // } else { + // break; + // } + // }, + // TrackReaderMode::Objects(objects) => loop { + // if let Err(err) = Self::recv_objects(objects.clone()).await { + // tracing::warn!("Error receiving objects: {}, retrying...", err); + // tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // } else { + // break; + // } + // }, + // TrackReaderMode::Datagrams(datagrams) => loop { + // if let Err(err) = Self::recv_datagrams(datagrams.clone()).await { + // tracing::warn!("Error receiving datagrams: {}, retrying...", err); + // tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // } else { + // break; + // } + // }, + // }, + // Err(_) => { + // tracing::warn!("Failed to get mode, retrying..."); + // tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // return Ok(()); + // } + // } + // } + // } + pub async fn run(self) -> anyhow::Result<()> { - match self.track.mode().await.context("failed to get mode")? { + match self.track.mode().await.context("failed to connect")? { TrackReaderMode::Stream(stream) => Self::recv_stream(stream).await, TrackReaderMode::Groups(groups) => Self::recv_groups(groups).await, TrackReaderMode::Objects(objects) => Self::recv_objects(objects).await, @@ -39,9 +87,7 @@ impl Subscriber { async fn recv_stream(mut track: StreamReader) -> anyhow::Result<()> { while let Some(mut group) = track.next().await? { - println!("received a stream"); while let Some(object) = group.read_next().await? { - println!("received a stream 1"); let str = String::from_utf8_lossy(&object); println!("{}", str); } @@ -70,7 +116,7 @@ impl Subscriber { "mouse_move" => { if let (Some(x), Some(y)) = (parsed.delta_x, parsed.delta_y) { // println!("Handling mouse_move with delta_x: {}, delta_y: {}", x, y); - enigo.move_mouse(x, y, Abs).unwrap(); + enigo.move_mouse(x, y, Rel).unwrap(); } } "mouse_key_down" => { @@ -218,3 +264,14 @@ pub fn key_to_enigo(key: u8) -> Option { _ => None, } } + +//NAME="${NAME:-$(head /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c 16)}" +// let _name = env::var("NAMESPACE").unwrap_or_else(|_| { +// let rng = rand::thread_rng(); +// let random_string: String = rng +// .sample_iter(&rand::distributions::Alphanumeric) +// .take(16) +// .map(char::from) +// .collect(); +// random_string +// }); \ No newline at end of file diff --git a/bin/input/src/main.rs b/bin/input/src/main.rs index 606a5aa8..aa216e04 100644 --- a/bin/input/src/main.rs +++ b/bin/input/src/main.rs @@ -1,14 +1,13 @@ -use moq_transport::{serve, session::Subscriber}; +use anyhow::Context; +use chrono::prelude::*; +use clap::Parser; use moq_native::quic; +use moq_transport::{serve, session::Subscriber}; use std::net; use url::Url; -use anyhow::Context; -use clap::Parser; - mod input; - #[derive(Parser, Clone)] pub struct Cli { /// Listen for UDP packets on the given address. @@ -53,26 +52,70 @@ async fn main() -> anyhow::Result<()> { tls, })?; - log::info!("connecting to server: url={}", config.url); - - let session = quic.client.connect(&config.url).await?; - - let (session, mut subscriber) = Subscriber::connect(session) - .await - .context("failed to create MoQ Transport session")?; - - let namespace = format!("{}input", config.namespace); - - let (prod, sub) = serve::Track::new(namespace, config.track).produce(); - - let input = input::Subscriber::new(sub); - - //TODO: Make sure to retry until the input server comes [Use Supervisord for now] - tokio::select! { - res = session.run() => res.context("session error")?, - res = input.run() => res.context("input error")?, - res = subscriber.subscribe(prod) => res.context("failed to subscribe to track")?, + let start = Utc::now(); + let mut now = start; + + loop { + log::info!("connecting to server: url={}", config.url); + + let session = quic.client.connect(&config.url).await?; + + let (session, mut subscriber) = Subscriber::connect(session) + .await + .context("failed to create MoQ Transport session")?; + + let namespace = format!("{}input", config.namespace); + + let (prod, sub) = serve::Track::new(namespace, config.track.clone()).produce(); + + let input = input::Subscriber::new(sub); + + // let (session, mut publisher) = Publisher::connect(session) + // .await + // .context("failed to create MoQ Transport session")?; + + // let (mut writer, _, reader) = serve::Tracks { + // namespace: config.namespace.clone(), + // } + // .produce(); + + // let track = writer.create(&config.track).unwrap(); + // let input_publisher = input::Publisher::new(track.groups()?); + + tokio::select! { + res = session.run() => { + if let Err(e) = res { + log::error!("session error: {}", e); + continue; + } + }, + res = input.run() => { + if let Err(e) = res { + log::error!("input subscriber error: {}", e); + continue; + } + }, + // res = publisher.announce(reader) => res.context("failed to serve tracks")?, + res = subscriber.subscribe(prod) => { + if let Err(e) = res { + log::error!("failed to subscribe to track: {}", e); + continue; + } + }, + } + + let next = now + chrono::Duration::try_minutes(1).unwrap(); + let next = next.with_second(0).unwrap().with_nanosecond(0).unwrap(); + + let delay = (next - now).to_std().unwrap(); + tokio::time::sleep(delay).await; + + // if next.minute() - now.minute() == 10 { + // return Ok(()); + // } + + now = next; // just assume we didn't undersleep } - Ok(()) + // Ok(()) }