A small library for defining I/O types which reconnect on errors.
To get started, add io-tether
to your list of dependencies
io-tether = { version = "0.4.0" }
The primary type exposed by this library is the Tether
type. This
type is generic over two parameters:
-
C
: The I/O connector. This is the type which produces the underlying connections. For some io types like QUIC this may need to be fairly involved, while for io like TCP, it may just be a wrapper around a socket address -
R
: The resolver. This type will likely be generated by you in order to handle the buisness logic required for your application whenever a disconnect occurs. It drives the reconnect process and allows developers to inject arbirtary asynchronous code at various stages of the reconnection process
Below is a simple example of a resolver implmentation that calls back to a channel whenever it detects a disconnect.
use std::{time::Duration, net::{SocketAddrV4, Ipv4Addr}};
use io_tether::{Resolver, Context, Reason, Tether, PinFut, tcp::TcpConnector};
use tokio::{net::TcpStream, io::{AsyncReadExt, AsyncWriteExt}, sync::mpsc};
pub struct ChannelResolver(mpsc::Sender<String>);
type Connector = TcpConnector<SocketAddrV4>;
// NOTE: If you don't need to act on the connector, this can be implemented for generic `C`
impl Resolver<Connector> for ChannelResolver {
fn disconnected(&mut self, context: &Context, conn: &mut Connector) -> PinFut<bool> {
let sender = self.0.clone();
let reason = context.reason().to_string();
// Try port 8081 when retrying
conn.get_addr_mut().set_port(8081);
Box::pin(async move {
// Send the disconnect reason over the channel
sender.send(reason).await.unwrap();
// We can call arbirtary async code here
tokio::time::sleep(Duration::from_millis(500)).await;
true
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, mut rx) = mpsc::channel(1);
let resolver = ChannelResolver(tx);
let listener_1 = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
let listener_2 = tokio::net::TcpListener::bind("0.0.0.0:8081").await?;
// Each listener, only accepts 1 connection, writing half of "foobar"
tokio::spawn(async move {
let (mut stream, _addr) = listener_1.accept().await.unwrap();
stream.write_all(b"foo").await.unwrap();
});
tokio::spawn(async move {
let (mut stream, _addr) = listener_2.accept().await.unwrap();
stream.write_all(b"bar").await.unwrap();
});
let handle = tokio::spawn(async move {
// Start by connecting to port 8080
let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8080);
let mut tether = Tether::connect_tcp(addr, resolver)
.await
.unwrap();
let mut buf = [0; 6];
// A disconnect occurs here after the server writes
// "foo" then drops the client, triggering a disconnect.
// The disconnect is detected and forwarded to the resolver,
// which adjusts the port, sleeps and attempts a reconnect.
//
// The resolver then connects to the new remote socket and we
// pull the next 3 bytes. This all happens under the hood
// without any extra work at each read callsite.
tether.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"foobar");
});
// Since a disconnect occurred during the call to read_exact,
// the channel will contain the disconnect reason
assert!(rx.recv().await.is_some());
handle.await?;
Ok(())
}
-
stubborn-io similar, but uses synchronous callbacks and a duration iterator for retries
-
tokio-retry a more general purpose future retry library