diff --git a/Cargo.lock b/Cargo.lock index 6acf5ec..25627c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -506,6 +506,26 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -924,6 +944,7 @@ dependencies = [ "futures", "hex", "log", + "pin-project", "serde", "serde_json", "tokio", diff --git a/zvt/Cargo.toml b/zvt/Cargo.toml index bfb70fe..e98a6ea 100644 --- a/zvt/Cargo.toml +++ b/zvt/Cargo.toml @@ -27,4 +27,5 @@ async-stream = "0.3.5" serde = { version = "1.0.185", features = ["derive"] } serde_json = "1.0.105" futures = "0.3.28" +pin-project = "1.1.3" diff --git a/zvt/src/feig/sequences.rs b/zvt/src/feig/sequences.rs index 198ca69..35994b0 100644 --- a/zvt/src/feig/sequences.rs +++ b/zvt/src/feig/sequences.rs @@ -1,16 +1,15 @@ -use crate::sequences::{read_packet_async, write_packet_async, write_with_ack_async, Sequence}; -use crate::{packets, ZvtEnum, ZvtParser }; +use crate::sequences::DataSource; +use crate::sequences::Sequence; +use crate::{packets, ZvtEnum, ZvtParser}; use anyhow::Result; use async_stream::try_stream; use std::boxed::Box; use std::collections::HashMap; use std::io::Seek; use std::io::{Error, ErrorKind}; -use std::marker::Unpin; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::pin::Pin; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_stream::Stream; use zvt_builder::ZVTError; @@ -90,15 +89,12 @@ pub enum WriteFileResponse { } impl WriteFile { - pub fn into_stream( + pub fn into_stream( path: PathBuf, password: usize, adpu_size: u32, - src: &mut Source, - ) -> Pin> + '_>> - where - Source: AsyncReadExt + AsyncWriteExt + Unpin, - { + src: &mut DataSource, + ) -> Pin> + '_>> { // Protocol from the handbook (the numbering is not part of the handbook) // 1.1 ECR->PT: Send over the list of all files with their sizes. // 1.2 PT->ECR: Ack @@ -133,25 +129,25 @@ impl WriteFile { }; // 1.1. and 1.2 - write_with_ack_async(&packet, &mut src).await?; + src.write_with_ack_async(&packet).await?; let mut buf = vec![0; adpu_size as usize]; println!("the length is {}", buf.len()); loop { // Get the data. - let bytes = read_packet_async(&mut src).await?; + let bytes = src.read_packet_async().await?; println!("The packet is {:?}", bytes); let response = WriteFileResponse::zvt_parse(&bytes)?; match response { WriteFileResponse::CompletionData(_) => { - write_packet_async(&mut src, &packets::Ack {}).await?; + src.write_packet_async(&packets::Ack {}).await?; yield response; break; } WriteFileResponse::Abort(_) => { - write_packet_async(&mut src, &packets::Ack {}).await?; + src.write_packet_async(&packets::Ack {}).await?; yield response; break; } @@ -193,7 +189,7 @@ impl WriteFile { }), }), }; - write_packet_async(&mut src, &packet).await?; + src.write_packet_async(&packet).await?; yield response; } diff --git a/zvt/src/sequences.rs b/zvt/src/sequences.rs index 8d24627..7aac968 100644 --- a/zvt/src/sequences.rs +++ b/zvt/src/sequences.rs @@ -7,7 +7,7 @@ use log::debug; use std::boxed::Box; use std::marker::Unpin; use std::pin::Pin; -use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; +use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; pub async fn read_packet_async(src: &mut Pin<&mut impl AsyncReadExt>) -> Result> { let mut buf = vec![0; 3]; @@ -65,21 +65,41 @@ where Ok(()) } -struct DataSource - where T: AsyncReadExt + AsyncWriteExt + Unpin { - s: T, +trait AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Sync + Send {} + +#[pin_project::pin_project] +pub struct DataSource { + #[pin] + s: Box, +} + +impl DataSource { + pub async fn read_packet_async(self: Pin<&mut Self>) -> Result> { + let this = self.project(); + let mut pinned_inner = this.s; + Ok(read_packet_async(&mut pinned_inner).await?) } -// TODO(hrapp): I wanted to wrap the functions "write_packet_async" and "read_packet_async" into -// a struct that can take any `T` and has the more type "read_packet" and "write_packet". However I -// completely failed to do so because of pin!(). How do I do that?!? -impl DataSource - where T: AsyncReadExt + AsyncWriteExt + Unpin { - pub async fn read_packet_async(self: Pin<&mut Self>) -> Result> { - read_packet_async(self.s).await? - } + pub async fn write_packet_async

(self: Pin<&mut Self>, p: &P) -> Result<()> + where + P: ZvtSerializer + Sync + Send, + encoding::Default: encoding::Encoding

, + { + let this = self.project(); + let mut pinned_inner = this.s; + Ok(write_packet_async(&mut pinned_inner, p).await?) } + pub async fn write_with_ack_async

(self: Pin<&mut Self>, p: &P) -> Result<()> + where + P: ZvtSerializer + Sync + Send, + encoding::Default: encoding::Encoding

, + { + let this = self.project(); + let mut pinned_inner = this.s; + Ok(write_with_ack_async(&mut pinned_inner, p).await?) + } +} /// The trait for converting a sequence into a stream. ///