From 0324e3d399e75889eabf201bd65a3ca9db6bd273 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Mon, 18 Sep 2023 05:40:45 +0000 Subject: [PATCH] Seprate tx/rx async drivers; event_queue access for tx/rx drivers --- src/can.rs | 8 + src/uart.rs | 454 ++++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 410 insertions(+), 52 deletions(-) diff --git a/src/can.rs b/src/can.rs index cd73a6c66f1..59a425eea94 100644 --- a/src/can.rs +++ b/src/can.rs @@ -615,6 +615,14 @@ where }) } + pub fn driver(&self) -> &CanDriver<'d> { + self.driver.borrow() + } + + pub fn driver_mut(&mut self) -> &mut CanDriver<'d> { + self.driver.borrow_mut() + } + pub fn start(&mut self) -> Result<(), EspError> { self.driver.borrow_mut().start() } diff --git a/src/uart.rs b/src/uart.rs index f5cc634716e..bd23bba9423 100644 --- a/src/uart.rs +++ b/src/uart.rs @@ -641,6 +641,7 @@ unsafe impl<'d> Sync for UartDriver<'d> {} pub struct UartRxDriver<'d> { port: u8, owner: Owner, + queue: Option>, _p: PhantomData<&'d mut ()>, } @@ -648,6 +649,7 @@ pub struct UartRxDriver<'d> { pub struct UartTxDriver<'d> { port: u8, owner: Owner, + queue: Option>, _p: PhantomData<&'d mut ()>, } @@ -741,11 +743,19 @@ impl<'d> UartDriver<'d> { UartTxDriver { port: self.port, owner: Owner::Borrowed, + queue: self + .queue + .as_ref() + .map(|queue| unsafe { Queue::new_borrowed(queue.as_raw()) }), _p: PhantomData, }, UartRxDriver { port: self.port, owner: Owner::Borrowed, + queue: self + .queue + .as_ref() + .map(|queue| unsafe { Queue::new_borrowed(queue.as_raw()) }), _p: PhantomData, }, ) @@ -756,17 +766,27 @@ impl<'d> UartDriver<'d> { /// Unlike [`split`], the halves are owned and reference counted. pub fn into_split(self) -> (UartTxDriver<'d>, UartRxDriver<'d>) { let port = self.port; + let tx_queue = self + .queue + .as_ref() + .map(|queue| unsafe { Queue::new_borrowed(queue.as_raw()) }); + let rx_queue = self + .queue + .as_ref() + .map(|queue| unsafe { Queue::new_borrowed(queue.as_raw()) }); let _ = ManuallyDrop::new(self); REFS[port as usize].fetch_add(2, Ordering::SeqCst); ( UartTxDriver { port, owner: Owner::Shared, + queue: tx_queue, _p: PhantomData, }, UartRxDriver { port, owner: Owner::Shared, + queue: rx_queue, _p: PhantomData, }, ) @@ -842,6 +862,10 @@ impl<'d> UartDriver<'d> { ManuallyDrop::new(UartRxDriver { port: self.port, owner: Owner::Borrowed, + queue: self + .queue + .as_ref() + .map(|queue| unsafe { Queue::new_borrowed(queue.as_raw()) }), _p: PhantomData, }) } @@ -850,6 +874,10 @@ impl<'d> UartDriver<'d> { ManuallyDrop::new(UartTxDriver { port: self.port, owner: Owner::Borrowed, + queue: self + .queue + .as_ref() + .map(|queue| unsafe { Queue::new_borrowed(queue.as_raw()) }), _p: PhantomData, }) } @@ -936,15 +964,43 @@ impl<'d> UartRxDriver<'d> { rts: Option + 'd>, config: &config::Config, ) -> Result { - new_common(uart, None::, Some(rx), cts, rts, config, None)?; + let mut q_handle_raw = ptr::null_mut(); + let q_handle = if config.queue_size > 0 { + Some(&mut q_handle_raw) + } else { + None + }; + new_common( + uart, + None::, + Some(rx), + cts, + rts, + config, + q_handle, + )?; + + // SAFTEY: okay because Queue borrows self + // SAFETY: we can safely use UartEvent instead of uart_event_t because of repr(transparent) + let queue = match q_handle_raw.is_null() { + false => Some(unsafe { Queue::new_borrowed(q_handle_raw) }), + true => None, + }; Ok(Self { port: UART::port() as _, owner: Owner::Owned, + queue, _p: PhantomData, }) } + /// Retrieves the event queue for this UART. Returns `None` if + /// the config specified 0 for `queue_size`. + pub fn event_queue(&self) -> Option<&Queue> { + self.queue.as_ref() + } + /// Change the number of stop bits pub fn change_stop_bits(&self, stop_bits: config::StopBits) -> Result<&Self, EspError> { change_stop_bits(self.port(), stop_bits).map(|_| self) @@ -1084,15 +1140,43 @@ impl<'d> UartTxDriver<'d> { rts: Option + 'd>, config: &config::Config, ) -> Result { - new_common(uart, Some(tx), None::, cts, rts, config, None)?; + let mut q_handle_raw = ptr::null_mut(); + let q_handle = if config.queue_size > 0 { + Some(&mut q_handle_raw) + } else { + None + }; + new_common( + uart, + Some(tx), + None::, + cts, + rts, + config, + q_handle, + )?; + + // SAFTEY: okay because Queue borrows self + // SAFETY: we can safely use UartEvent instead of uart_event_t because of repr(transparent) + let queue = match q_handle_raw.is_null() { + false => Some(unsafe { Queue::new_borrowed(q_handle_raw) }), + true => None, + }; Ok(Self { port: UART::port() as _, owner: Owner::Owned, + queue, _p: PhantomData, }) } + /// Retrieves the event queue for this UART. Returns `None` if + /// the config specified 0 for `queue_size`. + pub fn event_queue(&self) -> Option<&Queue> { + self.queue.as_ref() + } + /// Change the number of stop bits pub fn change_stop_bits(&self, stop_bits: config::StopBits) -> Result<&Self, EspError> { change_stop_bits(self.port(), stop_bits).map(|_| self) @@ -1171,7 +1255,7 @@ impl<'d> UartTxDriver<'d> { } /// Waits until the transmission is complete or until the specified timeout expires. - pub fn wait_done(&mut self, timeout: TickType_t) -> Result<(), EspError> { + pub fn wait_done(&self, timeout: TickType_t) -> Result<(), EspError> { esp!(unsafe { uart_wait_tx_done(self.port(), timeout) })?; Ok(()) @@ -1294,32 +1378,49 @@ where priority: Option, pin_to_core: Option, ) -> Result { - if let Some(queue) = driver.borrow().event_queue() { - let port = driver.borrow().port as usize; + let task = new_task_common( + driver.borrow().port, + driver.borrow().event_queue(), + priority, + pin_to_core, + )?; - unsafe { - QUEUES[port] = queue.as_raw() as _; - } + Ok(Self { + driver, + task, + _data: PhantomData, + }) + } - let task = unsafe { - task::create( - Self::process_events, - CStr::from_bytes_until_nul(b"UART - Events task\0").unwrap(), - 2048, - port as _, - priority.unwrap_or(6), - pin_to_core, - )? - }; - - Ok(Self { - driver, - task, + pub fn driver(&self) -> &UartDriver<'d> { + self.driver.borrow() + } + + pub fn driver_mut(&mut self) -> &mut UartDriver<'d> { + self.driver.borrow_mut() + } + + /// Split the serial driver in separate TX and RX drivers + pub fn split( + &self, + ) -> ( + AsyncUartTxDriver<'_, UartTxDriver<'_>>, + AsyncUartRxDriver<'_, UartRxDriver<'_>>, + ) { + let (tx, rx) = self.driver().split(); + + ( + AsyncUartTxDriver { + driver: tx, + task: None, _data: PhantomData, - }) - } else { - Err(EspError::from_infallible::()) - } + }, + AsyncUartRxDriver { + driver: rx, + task: None, + _data: PhantomData, + }, + ) } pub async fn read(&self, buf: &mut [u8]) -> Result { @@ -1360,28 +1461,6 @@ where TX_NOTIFS[port].wait().await; } } - - extern "C" fn process_events(arg: *mut core::ffi::c_void) { - let port: usize = arg as _; - let queue: Queue = unsafe { Queue::new_borrowed(QUEUES[port] as _) }; - - loop { - if let Some((event, _)) = queue.recv_front(delay::BLOCK) { - match event.payload() { - UartEventPayload::Data { .. } - | UartEventPayload::RxBufferFull - | UartEventPayload::RxFifoOverflow => { - READ_NOTIFS[port].notify(); - } - UartEventPayload::Break | UartEventPayload::DataBreak => { - WRITE_NOTIFS[port].notify(); - TX_NOTIFS[port].notify(); - } - _ => (), - } - } - } - } } impl<'d, T> Drop for AsyncUartDriver<'d, T> @@ -1389,10 +1468,7 @@ where T: BorrowMut>, { fn drop(&mut self) { - unsafe { - task::destroy(self.task); - QUEUES[self.driver.borrow().port as usize] = core::ptr::null_mut(); - } + drop_task_common(self.task, self.driver.borrow().port); } } @@ -1429,6 +1505,280 @@ where } } +pub struct AsyncUartRxDriver<'d, T> +where + T: BorrowMut>, +{ + driver: T, + task: Option, + _data: PhantomData<&'d ()>, +} + +impl<'d> AsyncUartRxDriver<'d, UartRxDriver<'d>> { + pub fn new( + uart: impl Peripheral

+ 'd, + rx: impl Peripheral

+ 'd, + cts: Option + 'd>, + rts: Option + 'd>, + config: &config::Config, + ) -> Result { + Self::wrap(UartRxDriver::new(uart, rx, cts, rts, config)?) + } +} + +impl<'d, T> AsyncUartRxDriver<'d, T> +where + T: BorrowMut>, +{ + pub fn wrap(driver: T) -> Result { + Self::wrap_custom(driver, None, None) + } + + pub fn wrap_custom( + driver: T, + priority: Option, + pin_to_core: Option, + ) -> Result { + let task = new_task_common( + driver.borrow().port, + driver.borrow().event_queue(), + priority, + pin_to_core, + )?; + + Ok(Self { + driver, + task: Some(task), + _data: PhantomData, + }) + } + + pub fn driver(&self) -> &UartRxDriver<'d> { + self.driver.borrow() + } + + pub fn driver_mut(&mut self) -> &mut UartRxDriver<'d> { + self.driver.borrow_mut() + } + + pub async fn read(&self, buf: &mut [u8]) -> Result { + loop { + match self.driver.borrow().read(buf, delay::NON_BLOCK) { + Ok(len) => return Ok(len), + Err(e) if e.code() != ESP_ERR_TIMEOUT => return Err(e), + _ => (), + } + + let port = self.driver.borrow().port as usize; + READ_NOTIFS[port].wait().await; + } + } +} + +impl<'d, T> Drop for AsyncUartRxDriver<'d, T> +where + T: BorrowMut>, +{ + fn drop(&mut self) { + if let Some(task) = self.task { + drop_task_common(task, self.driver.borrow().port); + } + } +} + +impl<'d, T> embedded_io::ErrorType for AsyncUartRxDriver<'d, T> +where + T: BorrowMut>, +{ + type Error = EspIOError; +} + +#[cfg(feature = "nightly")] +impl<'d, T> embedded_io_async::Read for AsyncUartRxDriver<'d, T> +where + T: BorrowMut>, +{ + async fn read(&mut self, buf: &mut [u8]) -> Result { + AsyncUartRxDriver::read(self, buf).await.map_err(EspIOError) + } +} + +pub struct AsyncUartTxDriver<'d, T> +where + T: BorrowMut>, +{ + driver: T, + task: Option, + _data: PhantomData<&'d ()>, +} + +impl<'d> AsyncUartTxDriver<'d, UartTxDriver<'d>> { + pub fn new( + uart: impl Peripheral

+ 'd, + tx: impl Peripheral

+ 'd, + cts: Option + 'd>, + rts: Option + 'd>, + config: &config::Config, + ) -> Result { + Self::wrap(UartTxDriver::new(uart, tx, cts, rts, config)?) + } +} + +impl<'d, T> AsyncUartTxDriver<'d, T> +where + T: BorrowMut>, +{ + pub fn wrap(driver: T) -> Result { + Self::wrap_custom(driver, None, None) + } + + pub fn wrap_custom( + driver: T, + priority: Option, + pin_to_core: Option, + ) -> Result { + let task = new_task_common( + driver.borrow().port, + driver.borrow().event_queue(), + priority, + pin_to_core, + )?; + + Ok(Self { + driver, + task: Some(task), + _data: PhantomData, + }) + } + + pub fn driver(&self) -> &UartTxDriver<'d> { + self.driver.borrow() + } + + pub fn driver_mut(&mut self) -> &mut UartTxDriver<'d> { + self.driver.borrow_mut() + } + + pub async fn write(&self, bytes: &[u8]) -> Result { + loop { + match self.driver.borrow().write_nb(bytes) { + Ok(len) if len > 0 => return Ok(len), + Err(e) => return Err(e), + _ => (), + } + + let port = self.driver.borrow().port as usize; + WRITE_NOTIFS[port].wait().await; + } + } + + pub async fn wait_done(&self) -> Result<(), EspError> { + loop { + match self.driver.borrow().wait_done(delay::NON_BLOCK) { + Ok(()) => return Ok(()), + Err(e) if e.code() != ESP_ERR_TIMEOUT => return Err(e), + _ => (), + } + + let port = self.driver.borrow().port as usize; + TX_NOTIFS[port].wait().await; + } + } +} + +impl<'d, T> Drop for AsyncUartTxDriver<'d, T> +where + T: BorrowMut>, +{ + fn drop(&mut self) { + if let Some(task) = self.task { + drop_task_common(task, self.driver.borrow().port); + } + } +} + +impl<'d, T> embedded_io::ErrorType for AsyncUartTxDriver<'d, T> +where + T: BorrowMut>, +{ + type Error = EspIOError; +} + +#[cfg(feature = "nightly")] +impl<'d, T> embedded_io_async::Write for AsyncUartTxDriver<'d, T> +where + T: BorrowMut>, +{ + async fn write(&mut self, buf: &[u8]) -> Result { + AsyncUartTxDriver::write(self, buf) + .await + .map_err(EspIOError) + } + + async fn flush(&mut self) -> Result<(), Self::Error> { + AsyncUartTxDriver::wait_done(self).await.map_err(EspIOError) + } +} + +fn new_task_common( + port: u8, + queue: Option<&Queue>, + priority: Option, + pin_to_core: Option, +) -> Result { + if let Some(queue) = queue { + let port = port as usize; + + let task = unsafe { + task::create( + process_events, + CStr::from_bytes_until_nul(b"UART - Events task\0").unwrap(), + 2048, + port as _, + priority.unwrap_or(6), + pin_to_core, + )? + }; + + unsafe { + QUEUES[port] = queue.as_raw() as _; + } + + Ok(task) + } else { + Err(EspError::from_infallible::()) + } +} + +fn drop_task_common(task: TaskHandle_t, port: u8) { + unsafe { + task::destroy(task); + QUEUES[port as usize] = core::ptr::null_mut(); + } +} + +extern "C" fn process_events(arg: *mut core::ffi::c_void) { + let port: usize = arg as _; + let queue: Queue = unsafe { Queue::new_borrowed(QUEUES[port] as _) }; + + loop { + if let Some((event, _)) = queue.recv_front(delay::BLOCK) { + match event.payload() { + UartEventPayload::Data { .. } + | UartEventPayload::RxBufferFull + | UartEventPayload::RxFifoOverflow => { + READ_NOTIFS[port].notify(); + } + UartEventPayload::Break | UartEventPayload::DataBreak => { + WRITE_NOTIFS[port].notify(); + TX_NOTIFS[port].notify(); + } + _ => (), + } + } + } +} + fn new_common( _uart: impl Peripheral

, tx: Option>,