Skip to content

Commit

Permalink
Funnel all writing through one function and add logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
SirVer committed Dec 7, 2023
1 parent d9b47b1 commit 5664021
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 23 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion zvt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ async-stream = "0.3.5"
serde = { version = "1.0.185", features = ["derive"] }
serde_json = "1.0.105"
futures = "0.3.28"

pretty-hex = "0.4.0"
12 changes: 5 additions & 7 deletions zvt/src/feig/sequences.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::sequences::{read_packet_async, write_with_ack_async, Sequence};
use crate::{packets, ZvtEnum, ZvtParser, ZvtSerializer};
use crate::sequences::{read_packet_async, write_packet_async, write_with_ack_async, Sequence};
use crate::{packets, ZvtEnum, ZvtParser};
use anyhow::Result;
use async_stream::try_stream;
use std::boxed::Box;
Expand Down Expand Up @@ -146,14 +146,12 @@ impl WriteFile {

match response {
WriteFileResponse::CompletionData(_) => {
src.write_all(&packets::Ack {}.zvt_serialize()).await?;

write_packet_async(&mut src, &packets::Ack {}).await?;
yield response;
break;
}
WriteFileResponse::Abort(_) => {
src.write_all(&packets::Ack {}.zvt_serialize()).await?;

write_packet_async(&mut src, &packets::Ack {}).await?;
yield response;
break;
}
Expand Down Expand Up @@ -195,7 +193,7 @@ impl WriteFile {
}),
}),
};
src.write_all(&packet.zvt_serialize()).await?;
write_packet_async(&mut src, &packet).await?;

yield response;
}
Expand Down
46 changes: 31 additions & 15 deletions zvt/src/sequences.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use crate::{encoding, ZvtEnum, ZvtParser, ZvtSerializer};
use anyhow::Result;
use async_stream::try_stream;
use futures::Stream;
use log::debug;
use log::{log_enabled, debug, Level::Debug};
use std::boxed::Box;
use std::marker::Unpin;
use std::pin::Pin;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

pub async fn read_packet_async(src: &mut Pin<&mut impl AsyncReadExt>) -> Result<Vec<u8>> {
let mut buf = vec![0; 3];
Expand All @@ -26,11 +26,28 @@ pub async fn read_packet_async(src: &mut Pin<&mut impl AsyncReadExt>) -> Result<
buf.resize(start + len, 0);
src.read_exact(&mut buf[start..]).await?;

debug!("Received {buf:?}");
if log_enabled!(Debug) {
debug!("RX: {}", pretty_hex::simple_hex(&buf));
}

Ok(buf.to_vec())
}

pub async fn write_packet_async<T>(
drain: &mut Pin<&mut impl AsyncWriteExt>,
p: &T,
) -> io::Result<()>
where
T: ZvtSerializer + Sync + Send,
encoding::Default: encoding::Encoding<T>,
{
let bytes = p.zvt_serialize();
if log_enabled!(Debug) {
debug!("TX: {}", pretty_hex::simple_hex(&bytes));
}
drain.write_all(&bytes).await
}

#[derive(ZvtEnum)]
enum Ack {
Ack(packets::Ack),
Expand All @@ -46,8 +63,7 @@ where
{
// We declare the bytes as a separate variable to help the compiler to
// figure out that we can send stuff between threads.
let bytes = p.zvt_serialize();
src.write_all(&bytes).await?;
write_packet_async(src, p).await?;

let bytes = read_packet_async(src).await?;
let _ = Ack::zvt_parse(&bytes)?;
Expand Down Expand Up @@ -91,7 +107,7 @@ where
let bytes = read_packet_async(&mut src).await?;
let packet = Self::Output::zvt_parse(&bytes)?;
// Write the response.
src.write_all(&packets::Ack {}.zvt_serialize()).await?;
write_packet_async::<packets::Ack>(&mut src, &packets::Ack {}).await?;
yield packet;
};
Box::pin(s)
Expand Down Expand Up @@ -149,7 +165,7 @@ impl Sequence for ReadCard {
let bytes = read_packet_async(&mut src).await?;
let packet = ReadCardResponse::zvt_parse(&bytes)?;
// Write the response.
src.write_all(&packets::Ack {}.zvt_serialize()).await?;
write_packet_async(&mut src, &packets::Ack {}).await?;

match packet {
ReadCardResponse::StatusInformation(_) | ReadCardResponse::Abort(_) => {
Expand Down Expand Up @@ -206,7 +222,7 @@ impl Sequence for Initialization {
let response = InitializationResponse::zvt_parse(&bytes)?;

// Every message requires an Ack.
src.write_all(&packets::Ack {}.zvt_serialize()).await?;
write_packet_async(&mut src, &packets::Ack {}).await?;

match response {
InitializationResponse::CompletionData(_)
Expand Down Expand Up @@ -313,7 +329,7 @@ impl Sequence for Diagnosis {
let response = DiagnosisResponse::zvt_parse(&bytes)?;

// Every message requires an Ack.
src.write_all(&packets::Ack {}.zvt_serialize()).await?;
write_packet_async(&mut src, &packets::Ack {}).await?;

match response {
DiagnosisResponse::CompletionData(_)
Expand Down Expand Up @@ -380,7 +396,7 @@ impl Sequence for EndOfDay {
let packet = EndOfDayResponse::zvt_parse(&bytes)?;

// Write the response.
src.write_all(&packets::Ack {}.zvt_serialize()).await?;
write_packet_async(&mut src, &packets::Ack {}).await?;
match packet {
EndOfDayResponse::CompletionData(_) | EndOfDayResponse::Abort(_) => {
yield packet;
Expand Down Expand Up @@ -445,7 +461,7 @@ impl Sequence for Reservation {
loop {
let bytes = read_packet_async(&mut src).await?;
let packet = AuthorizationResponse::zvt_parse(&bytes)?;
src.write_all(&packets::Ack {}.zvt_serialize()).await?;
write_packet_async(&mut src, &packets::Ack {}).await?;
match packet {
AuthorizationResponse::CompletionData(_) | AuthorizationResponse::Abort(_) => {
yield packet;
Expand Down Expand Up @@ -515,7 +531,7 @@ impl Sequence for PartialReversal {
loop {
let bytes = read_packet_async(&mut src).await?;
let packet = PartialReversalResponse::zvt_parse(&bytes)?;
src.write_all(&packets::Ack {}.zvt_serialize()).await?;
write_packet_async(&mut src, &packets::Ack {}).await?;
match packet {
PartialReversalResponse::CompletionData(_)
| PartialReversalResponse::PartialReversalAbort(_) => {
Expand Down Expand Up @@ -555,7 +571,7 @@ impl Sequence for PreAuthReversal {
loop {
let bytes = read_packet_async(&mut src).await?;
let packet = PartialReversalResponse::zvt_parse(&bytes)?;
src.write_all(&packets::Ack {}.zvt_serialize()).await?;
write_packet_async(&mut src, &packets::Ack {}).await?;
match packet {
PartialReversalResponse::CompletionData(_)
| PartialReversalResponse::PartialReversalAbort(_) => {
Expand Down Expand Up @@ -606,7 +622,7 @@ impl Sequence for PrintSystemConfiguration {
loop {
let bytes = read_packet_async(&mut src).await?;
let packet = PrintSystemConfigurationResponse::zvt_parse(&bytes)?;
src.write_all(&packets::Ack {}.zvt_serialize()).await?;
write_packet_async(&mut src, &packets::Ack {}).await?;
match packet {
PrintSystemConfigurationResponse::CompletionData(_) => {
yield packet;
Expand Down Expand Up @@ -677,7 +693,7 @@ impl Sequence for StatusEnquiry {
loop {
let bytes = read_packet_async(&mut src).await?;
let packet = StatusEnquiryResponse::zvt_parse(&bytes)?;
src.write_all(&packets::Ack {}.zvt_serialize()).await?;
write_packet_async(&mut src, &packets::Ack {}).await?;
match packet {
StatusEnquiryResponse::CompletionData(_) => {
yield packet;
Expand Down

0 comments on commit 5664021

Please sign in to comment.