From cdc6028299cd0d5a2e10a81e4e429e38aa3b3164 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Fri, 9 Feb 2024 17:26:24 -0500 Subject: [PATCH] =?UTF-8?q?server:=20=F0=9F=8C=BD=20`listen=5Fchannel`=20f?= =?UTF-8?q?or=20in-memory=20connections?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit this introduces a `Server::listen_channel` interface. this is like the existing `listen_tcp` or `listen_unix` methods, but instead applies to an in-memory channel of `(read, write)` tuples. this is motivated by work like penumbra-zone/penumbra#3588. to summarize the motivation, it would be nice if we had a way to run a tower-abci service in cargo tests, without needing to bind to an actual port, or create a unix-domain socket within a temporary directory. --- src/v037/server.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/v037/server.rs b/src/v037/server.rs index 3e46c06..9eb8403 100644 --- a/src/v037/server.rs +++ b/src/v037/server.rs @@ -184,6 +184,30 @@ where } } } + + pub async fn listen_channel(self, mut channel: tokio::sync::mpsc::Receiver<(R, W)>) + where + R: AsyncReadExt + std::marker::Unpin + Send + 'static, + W: AsyncWriteExt + std::marker::Unpin + Send + 'static, + { + tracing::info!("ABCI server starting on tokio channel"); + + match channel.recv().await { + Some((read, write)) => { + tracing::debug!("accepted new connection"); + let conn = Connection { + consensus: self.consensus.clone(), + mempool: self.mempool.clone(), + info: self.info.clone(), + snapshot: self.snapshot.clone(), + }; + tokio::spawn(async move { conn.run(read, write).await.unwrap() }); + } + None => { + tracing::trace!("channel closed"); + } + } + } } struct Connection {