diff --git a/zvt/Cargo.toml b/zvt/Cargo.toml index 1a605ec..5fd85cb 100644 --- a/zvt/Cargo.toml +++ b/zvt/Cargo.toml @@ -27,4 +27,3 @@ async-stream = "0.3.5" serde = { version = "1.0.185", features = ["derive"] } serde_json = "1.0.105" futures = "0.3.28" - diff --git a/zvt/src/bin/feig_update/main.rs b/zvt/src/bin/feig_update/main.rs index aaf8e6d..cfeb119 100644 --- a/zvt/src/bin/feig_update/main.rs +++ b/zvt/src/bin/feig_update/main.rs @@ -5,7 +5,7 @@ use std::fs::read_to_string; use std::path::PathBuf; use tokio::net::TcpStream; use tokio_stream::StreamExt; -use zvt::{feig, packets, sequences, sequences::Sequence}; +use zvt::{feig, io, packets, sequences, sequences::Sequence}; /// Updates a feig terminal. #[derive(Parser)] @@ -53,7 +53,8 @@ async fn main() -> Result<()> { let args = Args::parse(); // Connect to the payment terminal. - let mut socket = TcpStream::connect(&args.ip_address).await?; + let source = TcpStream::connect(&args.ip_address).await?; + let mut socket = io::PacketTransport { source }; const MAX_LEN_ADPU: u16 = 1u16 << 15; let registration = packets::Registration { password: args.password, diff --git a/zvt/src/bin/status/main.rs b/zvt/src/bin/status/main.rs index ee8bff7..926dab0 100644 --- a/zvt/src/bin/status/main.rs +++ b/zvt/src/bin/status/main.rs @@ -4,7 +4,7 @@ use log::info; use std::io::Write; use tokio::net::TcpStream; use tokio_stream::StreamExt; -use zvt::{packets, sequences, sequences::Sequence}; +use zvt::{io, packets, sequences, sequences::Sequence}; #[derive(Parser, Debug)] struct Args { @@ -57,7 +57,8 @@ async fn main() -> std::io::Result<()> { let args = Args::parse(); info!("Using the args {:?}", args); - let mut socket = TcpStream::connect(args.ip).await?; + let source = TcpStream::connect(args.ip).await?; + let mut socket = io::PacketTransport { source }; let request = packets::Registration { password: args.password, diff --git a/zvt/src/feig/sequences.rs b/zvt/src/feig/sequences.rs index 232dd19..08959a6 100644 --- a/zvt/src/feig/sequences.rs +++ b/zvt/src/feig/sequences.rs @@ -1,5 +1,6 @@ -use crate::sequences::{read_packet_async, write_with_ack_async, Sequence}; -use crate::{packets, ZvtEnum, ZvtParser, ZvtSerializer}; +use crate::io::PacketTransport; +use crate::sequences::Sequence; +use crate::{packets, ZvtEnum}; use anyhow::Result; use async_stream::try_stream; use std::boxed::Box; @@ -94,10 +95,10 @@ impl WriteFile { path: PathBuf, password: usize, adpu_size: u32, - src: &mut Source, + src: &mut PacketTransport, ) -> Pin> + '_>> where - Source: AsyncReadExt + AsyncWriteExt + Unpin, + Source: AsyncReadExt + AsyncWriteExt + Unpin + Send, { // Protocol from the handbook (the numbering is not part of the handbook) // 1.1 ECR->PT: Send over the list of all files with their sizes. @@ -108,8 +109,6 @@ impl WriteFile { // 3.0 PT->ERC replies with Completion. let s = try_stream! { - tokio::pin!(src); - use super::packets::tlv::File as TlvFile; let files = convert_dir(&path)?; let mut packets = Vec::with_capacity(files.len()); @@ -133,26 +132,23 @@ impl WriteFile { }; // 1.1. and 1.2 - write_with_ack_async(&packet, &mut src).await?; + src.write_packet_with_ack(&packet).await?; let mut buf = vec![0; adpu_size as usize]; println!("the length is {}", buf.len()); loop { // Get the data. - let bytes = read_packet_async(&mut src).await?; - println!("The packet is {:?}", bytes); - - let response = WriteFileResponse::zvt_parse(&bytes)?; + let response = src.read_packet().await?; match response { WriteFileResponse::CompletionData(_) => { - src.write_all(&packets::Ack {}.zvt_serialize()).await?; + src.write_packet(&packets::Ack {}).await?; yield response; break; } WriteFileResponse::Abort(_) => { - src.write_all(&packets::Ack {}.zvt_serialize()).await?; + src.write_packet(&packets::Ack {}).await?; yield response; break; @@ -195,7 +191,7 @@ impl WriteFile { }), }), }; - src.write_all(&packet.zvt_serialize()).await?; + src.write_packet(&packet).await?; yield response; } diff --git a/zvt/src/io.rs b/zvt/src/io.rs new file mode 100644 index 0000000..215f532 --- /dev/null +++ b/zvt/src/io.rs @@ -0,0 +1,95 @@ +use crate::packets; +use crate::ZvtEnum; +use crate::ZvtParser; +use anyhow::Result; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use zvt_builder::encoding; +use zvt_builder::ZvtSerializer; + +#[derive(ZvtEnum)] +pub enum Ack { + Ack(packets::Ack), +} + +pub struct PacketTransport { + pub source: Source, +} + +impl PacketTransport +where + S: AsyncReadExt + Unpin + Send, +{ + /// Reads an ADPU packet from the PT. + pub async fn read_packet(&mut self) -> Result + where + T: ZvtParser + Send, + { + let mut buf = vec![0; 3]; + self.source.read_exact(&mut buf).await?; + + // Get the len. + let len = if buf[2] == 0xff { + buf.resize(5, 0); + self.source.read_exact(&mut buf[3..5]).await?; + u16::from_le_bytes(buf[3..5].try_into().unwrap()) as usize + } else { + buf[2] as usize + }; + + let start = buf.len(); + buf.resize(start + len, 0); + self.source.read_exact(&mut buf[start..]).await?; + + log::debug!("Read {:?}", buf); + + Ok(T::zvt_parse(&buf)?) + } +} + +impl PacketTransport +where + S: AsyncWriteExt + Unpin + Send, +{ + /// Writes an ADPU packet to the PT. + pub async fn write_packet<'a, T>(&mut self, msg: &T) -> Result<()> + where + T: ZvtSerializer + Sync + Send, + encoding::Default: encoding::Encoding, + { + let bytes = msg.zvt_serialize(); + log::debug!("Write {:?}", bytes); + self.source + .write_all(&bytes) + .await + .map_err(|e| anyhow::anyhow!("Failed to write {:?}", e)) + } +} + +impl PacketTransport +where + S: AsyncWriteExt + AsyncReadExt + Unpin + Send, +{ + /// Reads an ADPU packet from the PT and send an [packets::Ack]. + pub async fn read_packet_with_ack<'a, T>(&mut self) -> Result + where + T: ZvtParser + Send, + { + let packet = self.read_packet::().await?; + self.write_packet(&packets::Ack {}).await?; + + Ok(packet) + } + + /// Writes an ADPU packet to the PT and awaits its [packets::Ack]. + pub async fn write_packet_with_ack<'a, T>(&mut self, msg: &T) -> Result<()> + where + T: ZvtSerializer + Sync + Send, + encoding::Default: encoding::Encoding, + { + self.write_packet(msg).await?; + + let _ = self.read_packet::().await?; + + Ok(()) + } +} diff --git a/zvt/src/lib.rs b/zvt/src/lib.rs index 6966a0a..7b50638 100644 --- a/zvt/src/lib.rs +++ b/zvt/src/lib.rs @@ -1,5 +1,6 @@ pub mod constants; pub mod feig; +pub mod io; pub mod packets; pub mod sequences; diff --git a/zvt/src/sequences.rs b/zvt/src/sequences.rs index 01670a7..a33fd27 100644 --- a/zvt/src/sequences.rs +++ b/zvt/src/sequences.rs @@ -1,59 +1,14 @@ +use crate::io::PacketTransport; use crate::packets; use crate::{encoding, ZvtEnum, ZvtParser, ZvtSerializer}; use anyhow::Result; use async_stream::try_stream; use futures::Stream; -use log::debug; use std::boxed::Box; use std::marker::Unpin; use std::pin::Pin; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -pub async fn read_packet_async(src: &mut Pin<&mut impl AsyncReadExt>) -> Result> { - let mut buf = vec![0; 3]; - src.read_exact(&mut buf).await?; - - // Get the len. - let len = if buf[2] == 0xff { - buf.resize(5, 0); - src.read_exact(&mut buf[3..5]).await?; - u16::from_le_bytes(buf[3..5].try_into().unwrap()) as usize - } else { - buf[2] as usize - }; - - let start = buf.len(); - buf.resize(start + len, 0); - src.read_exact(&mut buf[start..]).await?; - - debug!("Received {buf:?}"); - - Ok(buf.to_vec()) -} - -#[derive(ZvtEnum)] -enum Ack { - Ack(packets::Ack), -} - -pub async fn write_with_ack_async( - p: &T, - src: &mut Pin<&mut (impl AsyncReadExt + AsyncWriteExt)>, -) -> Result<()> -where - T: ZvtSerializer + Sync + Send, - encoding::Default: encoding::Encoding, -{ - // We declare the bytes as a separate variable to help the compiler to - // figure out that we can send stuff between threads. - let bytes = p.zvt_serialize(); - src.write_all(&bytes).await?; - - let bytes = read_packet_async(src).await?; - let _ = Ack::zvt_parse(&bytes)?; - Ok(()) -} - /// The trait for converting a sequence into a stream. /// /// What is written below? The [Self::Input] type must be a command as defined @@ -77,7 +32,7 @@ where fn into_stream<'a, Source>( input: &'a Self::Input, - src: &'a mut Source, + src: &'a mut PacketTransport, ) -> Pin> + Send + 'a>> where Source: AsyncReadExt + AsyncWriteExt + Unpin + Send, @@ -85,13 +40,11 @@ where { let s = try_stream! { // This pin has nothing to do with the fact that we return a Stream - // but is needed to access methods like `write_all`. - tokio::pin!(src); - write_with_ack_async(input, &mut src).await?; - let bytes = read_packet_async(&mut src).await?; - let packet = Self::Output::zvt_parse(&bytes)?; + // but is needed to access methods like `write_packet`. + src.write_packet_with_ack(input).await?; + let packet = src.read_packet().await?; // Write the response. - src.write_all(&packets::Ack {}.zvt_serialize()).await?; + src.write_packet::(&packets::Ack {}).await?; yield packet; }; Box::pin(s) @@ -136,20 +89,18 @@ impl Sequence for ReadCard { fn into_stream<'a, Source>( input: &'a Self::Input, - src: &'a mut Source, + src: &'a mut PacketTransport, ) -> Pin> + Send + 'a>> where Source: AsyncReadExt + AsyncWriteExt + Unpin + Send, Self: 'a, { let s = try_stream! { - tokio::pin!(src); - write_with_ack_async(input, &mut src).await?; + src.write_packet_with_ack(input).await?; loop { - let bytes = read_packet_async(&mut src).await?; - let packet = ReadCardResponse::zvt_parse(&bytes)?; + let packet = src.read_packet().await?; // Write the response. - src.write_all(&packets::Ack {}.zvt_serialize()).await?; + src.write_packet(&packets::Ack {}).await?; match packet { ReadCardResponse::StatusInformation(_) | ReadCardResponse::Abort(_) => { @@ -191,22 +142,20 @@ impl Sequence for Initialization { fn into_stream<'a, Source>( input: &'a Self::Input, - src: &'a mut Source, + src: &'a mut PacketTransport, ) -> Pin> + Send + 'a>> where Source: AsyncReadExt + AsyncWriteExt + Unpin + Send, Self: 'a, { let s = try_stream! { - tokio::pin!(src); // 2.18.1 - write_with_ack_async(input, &mut src).await?; + src.write_packet_with_ack(input).await?; loop { - let bytes = read_packet_async(&mut src).await?; - let response = InitializationResponse::zvt_parse(&bytes)?; + let response = src.read_packet().await?; // Every message requires an Ack. - src.write_all(&packets::Ack {}.zvt_serialize()).await?; + src.write_packet(&packets::Ack {}).await?; match response { InitializationResponse::CompletionData(_) @@ -298,22 +247,20 @@ impl Sequence for Diagnosis { fn into_stream<'a, Source>( input: &'a Self::Input, - src: &'a mut Source, + src: &'a mut PacketTransport, ) -> Pin> + Send + 'a>> where Source: AsyncReadExt + AsyncWriteExt + Unpin + Send, Self: 'a, { let s = try_stream! { - tokio::pin!(src); // 2.18.1 - write_with_ack_async(input, &mut src).await?; + src.write_packet_with_ack(input).await?; loop { - let bytes = read_packet_async(&mut src).await?; - let response = DiagnosisResponse::zvt_parse(&bytes)?; + let response = src.read_packet().await?; // Every message requires an Ack. - src.write_all(&packets::Ack {}.zvt_serialize()).await?; + src.write_packet(&packets::Ack {}).await?; match response { DiagnosisResponse::CompletionData(_) @@ -364,23 +311,21 @@ impl Sequence for EndOfDay { fn into_stream<'a, Source>( input: &'a Self::Input, - src: &'a mut Source, + src: &'a mut PacketTransport, ) -> Pin> + Send + 'a>> where Source: AsyncReadExt + AsyncWriteExt + Unpin + Send, Self: 'a, { let s = try_stream! { - tokio::pin!(src); // 2.16.1 - write_with_ack_async(input, &mut src).await?; + src.write_packet_with_ack(input).await?; loop { - let bytes = read_packet_async(&mut src).await?; - let packet = EndOfDayResponse::zvt_parse(&bytes)?; + let packet = src.read_packet().await?; // Write the response. - src.write_all(&packets::Ack {}.zvt_serialize()).await?; + src.write_packet(&packets::Ack {}).await?; match packet { EndOfDayResponse::CompletionData(_) | EndOfDayResponse::Abort(_) => { yield packet; @@ -431,21 +376,19 @@ impl Sequence for Reservation { fn into_stream<'a, Source>( input: &'a Self::Input, - src: &'a mut Source, + src: &'a mut PacketTransport, ) -> Pin> + Send + 'a>> where Source: AsyncReadExt + AsyncWriteExt + Unpin + Send, Self: 'a, { let s = try_stream! { - tokio::pin!(src); // 2.8 - write_with_ack_async(input, &mut src).await?; + src.write_packet_with_ack(input).await?; loop { - let bytes = read_packet_async(&mut src).await?; - let packet = AuthorizationResponse::zvt_parse(&bytes)?; - src.write_all(&packets::Ack {}.zvt_serialize()).await?; + let packet = src.read_packet().await?; + src.write_packet(&packets::Ack {}).await?; match packet { AuthorizationResponse::CompletionData(_) | AuthorizationResponse::Abort(_) => { yield packet; @@ -502,20 +445,18 @@ impl Sequence for PartialReversal { fn into_stream<'a, Source>( input: &'a Self::Input, - src: &'a mut Source, + src: &'a mut PacketTransport, ) -> Pin> + Send + 'a>> where Source: AsyncReadExt + AsyncWriteExt + Unpin + Send, Self: 'a, { let s = try_stream! { - tokio::pin!(src); - write_with_ack_async(input, &mut src).await?; + src.write_packet_with_ack(input).await?; loop { - let bytes = read_packet_async(&mut src).await?; - let packet = PartialReversalResponse::zvt_parse(&bytes)?; - src.write_all(&packets::Ack {}.zvt_serialize()).await?; + let packet = src.read_packet().await?; + src.write_packet(&packets::Ack {}).await?; match packet { PartialReversalResponse::CompletionData(_) | PartialReversalResponse::PartialReversalAbort(_) => { @@ -542,20 +483,18 @@ impl Sequence for PreAuthReversal { fn into_stream<'a, Source>( input: &'a Self::Input, - src: &'a mut Source, + src: &'a mut PacketTransport, ) -> Pin> + Send + 'a>> where Source: AsyncReadExt + AsyncWriteExt + Unpin + Send, Self: 'a, { let s = try_stream! { - tokio::pin!(src); - write_with_ack_async(input, &mut src).await?; + src.write_packet_with_ack(input).await?; loop { - let bytes = read_packet_async(&mut src).await?; - let packet = PartialReversalResponse::zvt_parse(&bytes)?; - src.write_all(&packets::Ack {}.zvt_serialize()).await?; + let packet = src.read_packet().await?; + src.write_packet(&packets::Ack {}).await?; match packet { PartialReversalResponse::CompletionData(_) | PartialReversalResponse::PartialReversalAbort(_) => { @@ -593,20 +532,18 @@ impl Sequence for PrintSystemConfiguration { fn into_stream<'a, Source>( input: &'a Self::Input, - src: &'a mut Source, + src: &'a mut PacketTransport, ) -> Pin> + Send + 'a>> where Source: AsyncReadExt + AsyncWriteExt + Unpin + Send, Self: 'a, { let s = try_stream! { - tokio::pin!(src); - write_with_ack_async(input, &mut src).await?; + src.write_packet_with_ack(input).await?; loop { - let bytes = read_packet_async(&mut src).await?; - let packet = PrintSystemConfigurationResponse::zvt_parse(&bytes)?; - src.write_all(&packets::Ack {}.zvt_serialize()).await?; + let packet = src.read_packet().await?; + src.write_packet(&packets::Ack {}).await?; match packet { PrintSystemConfigurationResponse::CompletionData(_) => { yield packet; @@ -664,20 +601,18 @@ impl Sequence for StatusEnquiry { fn into_stream<'a, Source>( input: &'a Self::Input, - src: &'a mut Source, + src: &'a mut PacketTransport, ) -> Pin> + Send + 'a>> where Source: AsyncReadExt + AsyncWriteExt + Unpin + Send, Self: 'a, { let s = try_stream! { - tokio::pin!(src); - write_with_ack_async(input, &mut src).await?; + src.write_packet_with_ack(input).await?; loop { - let bytes = read_packet_async(&mut src).await?; - let packet = StatusEnquiryResponse::zvt_parse(&bytes)?; - src.write_all(&packets::Ack {}.zvt_serialize()).await?; + let packet = src.read_packet().await?; + src.write_packet(&packets::Ack {}).await?; match packet { StatusEnquiryResponse::CompletionData(_) => { yield packet;