diff --git a/src/flightaware.rs b/src/flightaware.rs index 74fe4f4..5682cc1 100644 --- a/src/flightaware.rs +++ b/src/flightaware.rs @@ -59,11 +59,13 @@ fn get_flightplan_from_json(data: &Value) -> Option { }); } +#[derive(Debug)] struct FlightPlanRequest { id: String, callsign: String } +#[derive(Debug)] pub struct FlightPlanResult { pub id: String, pub callsign: String, @@ -85,7 +87,7 @@ impl FlightAware { pub fn new() -> Self { Self { client: Arc::new(Mutex::new(Client::new())), - flightplans: Request::new() + flightplans: Request::new(5) } } diff --git a/src/noaa.rs b/src/noaa.rs index ca1745c..b11c8f2 100644 --- a/src/noaa.rs +++ b/src/noaa.rs @@ -22,7 +22,7 @@ pub struct NoaaWeather { impl NoaaWeather { pub fn new() -> Self { Self { - weather_request: Request::new(), + weather_request: Request::new(1), client: Arc::new(Mutex::new(Client::new())) } } diff --git a/src/request.rs b/src/request.rs index 0f3935e..e58e661 100644 --- a/src/request.rs +++ b/src/request.rs @@ -1,4 +1,4 @@ -use std::thread; +use std::{thread, sync::Arc}; use crossbeam_channel::{unbounded, Sender, Receiver}; use crossbeam_deque::{Worker, Steal}; @@ -6,35 +6,44 @@ use crossbeam_deque::{Worker, Steal}; pub struct Request { tx: Sender, rx: Receiver, - worker: Worker + worker: Worker, + num_threads: u32 } impl Request where T: Send + 'static, J: Send + 'static { - pub fn new() -> Self { + pub fn new(num_threads: u32) -> Self { let (tx, rx) = unbounded(); Self { rx, tx, - worker: Worker::new_fifo() + worker: Worker::new_fifo(), + num_threads } } pub fn run(&self, worker: F) where - F: Fn(J) -> T + Send + 'static { - let result_transmitter = self.tx.clone(); - let s = self.worker.stealer(); - - thread::spawn(move || { - loop { - match s.steal() { - Steal::Success(job) => { - result_transmitter.send(worker(job)).ok(); - }, - _ => () - } - thread::sleep(std::time::Duration::from_millis(10)); + F: Fn(J) -> T + Send + Sync + 'static { + let worker = Arc::new(worker); + + // Spawn worker threads to read from queue + (0..self.num_threads).for_each(|_| { + let s = self.worker.stealer(); + let result_transmitter = self.tx.clone(); + let worker = worker.clone(); + // Process tasks + thread::spawn(move || { + loop { + match s.steal() { + Steal::Success(job) => { + result_transmitter.send(worker(job)).ok(); + }, + _ => () + } + thread::sleep(std::time::Duration::from_millis(10)); + } + }); } - }); + ); } pub fn get_next(&self) -> Option {