Skip to content

Commit

Permalink
feat(rt-sync): add notifications endpoint (#694)
Browse files Browse the repository at this point in the history
  • Loading branch information
hydra-yse authored Feb 9, 2025
1 parent b59e28f commit e674705
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 23 deletions.
8 changes: 0 additions & 8 deletions lib/core/src/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1035,14 +1035,6 @@ impl LiquidSdk {
};
}
Ok(InputType::Bolt11 { invoice }) => {
if let Some(sync_service) = &self.sync_service {
sync_service
.pull()
.await
.map_err(|err| PaymentError::Generic {
err: format!("Could not pull real-time sync changes: {err:?}"),
})?;
}
self.ensure_send_is_not_self_transfer(&invoice.bolt11)?;
self.validate_bolt11_invoice(&invoice.bolt11)?;

Expand Down
14 changes: 12 additions & 2 deletions lib/core/src/sync/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@ use tonic::{
metadata::{errors::InvalidMetadataValue, Ascii, MetadataValue},
service::{interceptor::InterceptedService, Interceptor},
transport::{Channel, ClientTlsConfig, Endpoint},
Request, Status,
Request, Status, Streaming,
};

use super::model::{
syncer_client::SyncerClient as ProtoSyncerClient, ListChangesReply, ListChangesRequest,
SetRecordReply, SetRecordRequest,
ListenChangesRequest, Notification, SetRecordReply, SetRecordRequest,
};

#[async_trait]
pub(crate) trait SyncerClient: Send + Sync {
async fn connect(&self, connect_url: String) -> Result<()>;
async fn push(&self, req: SetRecordRequest) -> Result<SetRecordReply>;
async fn pull(&self, req: ListChangesRequest) -> Result<ListChangesReply>;
async fn listen(&self, req: ListenChangesRequest) -> Result<Streaming<Notification>>;
async fn disconnect(&self) -> Result<()>;
}

Expand Down Expand Up @@ -99,6 +100,15 @@ impl SyncerClient for BreezSyncerClient {
.into_inner())
}

async fn listen(&self, req: ListenChangesRequest) -> Result<Streaming<Notification>> {
Ok(self
.get_client()
.await?
.listen_changes(req)
.await?
.into_inner())
}

async fn disconnect(&self) -> Result<()> {
let mut channel = self.grpc_channel.lock().await;
*channel = None;
Expand Down
74 changes: 66 additions & 8 deletions lib/core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ use futures_util::TryFutureExt;
use log::trace;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{watch, Mutex};
use tokio::time::sleep;
use tokio_stream::StreamExt as _;
use tonic::Streaming;

use self::client::SyncerClient;
use self::model::{data::SyncData, ListChangesRequest, RecordType, SyncState};
use self::model::{DecryptionError, SyncOutgoingChanges};
use self::model::{DecryptionError, ListenChangesRequest, Notification, SyncOutgoingChanges};
use crate::prelude::Swap;
use crate::recover::recoverer::Recoverer;
use crate::sync::model::data::{
Expand All @@ -36,6 +39,8 @@ pub(crate) struct SyncService {
sync_trigger: Mutex<Receiver<()>>,
}

type RemoteStream = Option<Streaming<Notification>>;

impl SyncService {
fn set_sync_trigger(persister: Arc<Persister>) -> Receiver<()> {
let (sync_trigger_tx, sync_trigger_rx) = tokio::sync::mpsc::channel::<()>(30);
Expand Down Expand Up @@ -80,6 +85,43 @@ impl SyncService {
}
}

async fn new_listener(&self) -> Result<RemoteStream> {
let req = ListenChangesRequest::new(self.signer.clone())?;
Ok(match self.client.listen(req).await {
Ok(trigger) => Some(trigger),
Err(err) => {
log::warn!("Could not create new remote sync trigger: {err:?}");
None
}
})
}

fn handle_reconnect(
self: Arc<Self>,
remote_trigger: Arc<Mutex<RemoteStream>>,
mut shutdown: watch::Receiver<()>,
) {
tokio::spawn(async move {
let mut remote_trigger = remote_trigger.lock().await;
loop {
tokio::select! {
_ = shutdown.changed() => {
log::info!("Received shutdown signal, exiting realtime sync reconnect loop");
return;
}
Ok(maybe_new_trigger) = self.new_listener() => match maybe_new_trigger {
Some(new_trigger) => {
*remote_trigger = Some(new_trigger);
self.run_event_loop().await;
return;
},
None => sleep(Duration::from_secs(5)).await,
}
}
}
});
}

pub(crate) fn start(self: Arc<Self>, mut shutdown: watch::Receiver<()>) {
tokio::spawn(async move {
if let Err(err) = self.client.connect(self.remote_url.clone()).await {
Expand All @@ -91,16 +133,32 @@ impl SyncService {
return;
}

let mut sync_trigger = self.sync_trigger.lock().await;
let mut event_loop_interval = tokio::time::interval(Duration::from_secs(30));
let mut local_sync_trigger = self.sync_trigger.lock().await;
let remote_sync_trigger =
Arc::new(Mutex::new(self.new_listener().await.unwrap_or(None)));

log::debug!("Starting real-time sync event loop");

loop {
tokio::select! {
_ = event_loop_interval.tick() => self.run_event_loop().await,
Some(_) = sync_trigger.recv() => {
self.run_event_loop().await;
event_loop_interval.reset();
let remote_lock_and_read = async {
let mut lock = remote_sync_trigger.lock().await;
match lock.as_mut() {
Some(trigger) => trigger.next().await,
// Future hangs in case of a missing initial remote trigger,
// tokio_select! branch is never picked
None => std::future::pending().await,
}
};

tokio::select! {
Some(_) = local_sync_trigger.recv() => self.run_event_loop().await,
Some(msg) = remote_lock_and_read => match msg {
Ok(_) => self.run_event_loop().await,
Err(err) => {
log::warn!("Received status {} from remote, attempting to reconnect.", err.message());
self.clone().handle_reconnect(remote_sync_trigger.clone(), shutdown.clone());
}
},
_ = shutdown.changed() => {
log::info!("Received shutdown signal, exiting realtime sync service loop");
if let Err(err) = self.client.disconnect().await {
Expand Down
16 changes: 15 additions & 1 deletion lib/core/src/sync/model/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use lwk_wollet::hashes::hex::DisplayHex as _;
use openssl::sha::sha256;
use std::sync::Arc;

use super::{ListChangesRequest, Record, SetRecordRequest, CURRENT_SCHEMA_VERSION, MESSAGE_PREFIX};
use super::{
ListChangesRequest, ListenChangesRequest, Record, SetRecordRequest, CURRENT_SCHEMA_VERSION,
MESSAGE_PREFIX,
};

fn sign_message(msg: &[u8], signer: Arc<Box<dyn Signer>>) -> Result<String, SignerError> {
let msg = [MESSAGE_PREFIX, msg].concat();
Expand Down Expand Up @@ -56,3 +59,14 @@ impl SetRecordRequest {
})
}
}
impl ListenChangesRequest {
pub(crate) fn new(signer: Arc<Box<dyn Signer>>) -> Result<Self, SignerError> {
let request_time = utils::now();
let msg = format!("{}", request_time);
let signature = sign_message(msg.as_bytes(), signer)?;
Ok(Self {
request_time,
signature,
})
}
}
6 changes: 4 additions & 2 deletions lib/core/src/sync/proto/sync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package sync;
service Syncer {
rpc SetRecord(SetRecordRequest) returns (SetRecordReply) {}
rpc ListChanges(ListChangesRequest) returns (ListChangesReply) {}
rpc TrackChanges(TrackChangesRequest) returns (stream Record);
rpc ListenChanges(ListenChangesRequest) returns (stream Notification);
}

message Record {
Expand Down Expand Up @@ -37,7 +37,9 @@ message ListChangesRequest {
}
message ListChangesReply { repeated Record changes = 1; }

message TrackChangesRequest {
message ListenChangesRequest {
uint32 request_time = 1;
string signature = 2;
}

message Notification {}
9 changes: 7 additions & 2 deletions lib/core/src/test_utils/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::{
client::SyncerClient,
model::{
data::{ChainSyncData, ReceiveSyncData, SendSyncData},
ListChangesReply, ListChangesRequest, Record, SetRecordReply, SetRecordRequest,
SetRecordStatus,
ListChangesReply, ListChangesRequest, ListenChangesRequest, Notification, Record,
SetRecordReply, SetRecordRequest, SetRecordStatus,
},
SyncService,
},
Expand All @@ -22,6 +22,7 @@ use tokio::sync::{
mpsc::{self, Receiver, Sender},
Mutex,
};
use tonic::Streaming;

pub(crate) struct MockSyncerClient {
pub(crate) incoming_rx: Mutex<Receiver<Record>>,
Expand Down Expand Up @@ -79,6 +80,10 @@ impl SyncerClient for MockSyncerClient {
Ok(ListChangesReply { changes })
}

async fn listen(&self, _req: ListenChangesRequest) -> Result<Streaming<Notification>> {
todo!()
}

async fn disconnect(&self) -> Result<()> {
todo!()
}
Expand Down

0 comments on commit e674705

Please sign in to comment.