Skip to content

Commit

Permalink
Next blocker.
Browse files Browse the repository at this point in the history
  • Loading branch information
SirVer committed Dec 7, 2023
1 parent 5253bf9 commit 9158527
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 27 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions zvt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ async-stream = "0.3.5"
serde = { version = "1.0.185", features = ["derive"] }
serde_json = "1.0.105"
futures = "0.3.28"
pin-project = "1.1.3"

26 changes: 11 additions & 15 deletions zvt/src/feig/sequences.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use crate::sequences::{read_packet_async, write_packet_async, write_with_ack_async, Sequence};
use crate::{packets, ZvtEnum, ZvtParser };
use crate::sequences::DataSource;
use crate::sequences::Sequence;
use crate::{packets, ZvtEnum, ZvtParser};
use anyhow::Result;
use async_stream::try_stream;
use std::boxed::Box;
use std::collections::HashMap;
use std::io::Seek;
use std::io::{Error, ErrorKind};
use std::marker::Unpin;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_stream::Stream;
use zvt_builder::ZVTError;

Expand Down Expand Up @@ -90,15 +89,12 @@ pub enum WriteFileResponse {
}

impl WriteFile {
pub fn into_stream<Source>(
pub fn into_stream(
path: PathBuf,
password: usize,
adpu_size: u32,
src: &mut Source,
) -> Pin<Box<impl Stream<Item = Result<WriteFileResponse>> + '_>>
where
Source: AsyncReadExt + AsyncWriteExt + Unpin,
{
src: &mut DataSource,
) -> Pin<Box<impl Stream<Item = Result<WriteFileResponse>> + '_>> {
// Protocol from the handbook (the numbering is not part of the handbook)
// 1.1 ECR->PT: Send over the list of all files with their sizes.
// 1.2 PT->ECR: Ack
Expand Down Expand Up @@ -133,25 +129,25 @@ impl WriteFile {
};

// 1.1. and 1.2
write_with_ack_async(&packet, &mut src).await?;
src.write_with_ack_async(&packet).await?;
let mut buf = vec![0; adpu_size as usize];
println!("the length is {}", buf.len());

loop {
// Get the data.
let bytes = read_packet_async(&mut src).await?;
let bytes = src.read_packet_async().await?;
println!("The packet is {:?}", bytes);

let response = WriteFileResponse::zvt_parse(&bytes)?;

match response {
WriteFileResponse::CompletionData(_) => {
write_packet_async(&mut src, &packets::Ack {}).await?;
src.write_packet_async(&packets::Ack {}).await?;
yield response;
break;
}
WriteFileResponse::Abort(_) => {
write_packet_async(&mut src, &packets::Ack {}).await?;
src.write_packet_async(&packets::Ack {}).await?;
yield response;
break;
}
Expand Down Expand Up @@ -193,7 +189,7 @@ impl WriteFile {
}),
}),
};
write_packet_async(&mut src, &packet).await?;
src.write_packet_async(&packet).await?;

yield response;
}
Expand Down
44 changes: 32 additions & 12 deletions zvt/src/sequences.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use log::debug;
use std::boxed::Box;
use std::marker::Unpin;
use std::pin::Pin;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

pub async fn read_packet_async(src: &mut Pin<&mut impl AsyncReadExt>) -> Result<Vec<u8>> {
let mut buf = vec![0; 3];
Expand Down Expand Up @@ -65,21 +65,41 @@ where
Ok(())
}

struct DataSource<T>
where T: AsyncReadExt + AsyncWriteExt + Unpin {
s: T,
trait AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Sync + Send {}

#[pin_project::pin_project]
pub struct DataSource {
#[pin]
s: Box<dyn AsyncReadWrite>,
}

impl DataSource {
pub async fn read_packet_async(self: Pin<&mut Self>) -> Result<Vec<u8>> {
let this = self.project();
let mut pinned_inner = this.s;
Ok(read_packet_async(&mut pinned_inner).await?)
}

// TODO(hrapp): I wanted to wrap the functions "write_packet_async" and "read_packet_async" into
// a struct that can take any `T` and has the more type "read_packet" and "write_packet". However I
// completely failed to do so because of pin!(). How do I do that?!?
impl<T> DataSource<T>
where T: AsyncReadExt + AsyncWriteExt + Unpin {
pub async fn read_packet_async(self: Pin<&mut Self>) -> Result<Vec<u8>> {
read_packet_async(self.s).await?
}
pub async fn write_packet_async<P>(self: Pin<&mut Self>, p: &P) -> Result<()>
where
P: ZvtSerializer + Sync + Send,
encoding::Default: encoding::Encoding<P>,
{
let this = self.project();
let mut pinned_inner = this.s;
Ok(write_packet_async(&mut pinned_inner, p).await?)
}

pub async fn write_with_ack_async<P>(self: Pin<&mut Self>, p: &P) -> Result<()>
where
P: ZvtSerializer + Sync + Send,
encoding::Default: encoding::Encoding<P>,
{
let this = self.project();
let mut pinned_inner = this.s;
Ok(write_with_ack_async(&mut pinned_inner, p).await?)
}
}

/// The trait for converting a sequence into a stream.
///
Expand Down

0 comments on commit 9158527

Please sign in to comment.