From 0af3b59e4eeaad468ba2550ad560502d12169c34 Mon Sep 17 00:00:00 2001 From: "Heinz N. Gies" Date: Mon, 1 Nov 2021 17:55:44 +0100 Subject: [PATCH] Update wal to cache it's own ids Signed-off-by: Heinz N. Gies --- src/connectors/wal.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/connectors/wal.rs b/src/connectors/wal.rs index d6096fcbaa..5ec992c5d9 100644 --- a/src/connectors/wal.rs +++ b/src/connectors/wal.rs @@ -13,7 +13,7 @@ // limitations under the License. #![cfg(not(tarpaulin_include))] -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use crate::connectors::prelude::*; use async_std::sync::Mutex; @@ -69,6 +69,7 @@ impl ConnectorBuilder for Builder { struct WalSource { origin_uri: EventOriginUri, wal: Arc>, + pull_id_map: HashMap, // FIXME: this is terrible :( } struct Payload(Event); @@ -87,12 +88,14 @@ impl qwal::Entry for Payload { #[async_trait::async_trait] impl Source for WalSource { - async fn pull_data(&mut self, _pull_id: u64, _ctx: &SourceContext) -> Result { + async fn pull_data(&mut self, pull_id: u64, _ctx: &SourceContext) -> Result { if let Some((id, event)) = self.wal.lock().await.pop::().await? { + // FIXME: this is a dirty hack untill we can define the pull_id + self.pull_id_map.insert(pull_id, id); Ok(SourceReply::Structured { origin_uri: self.origin_uri.clone(), payload: event.data, - stream: id, // FIXME: this is a dirty hack untill we can define the pull_id + stream: 0, port: None, }) } else { @@ -100,12 +103,19 @@ impl Source for WalSource { } } - async fn ack(&mut self, stream_id: u64, _pull_id: u64) { + async fn ack(&mut self, _stream_id: u64, pull_id: u64) { // FIXME: this is a dirty hack until we can define the pull_id for a connector // FIXME: we should allow returning errors // FIXME: why is this never being called - dbg!(stream_id); - self.wal.lock().await.ack(stream_id).await.unwrap() + if let Some(id) = self.pull_id_map.remove(&pull_id) { + dbg!(id); + self.wal.lock().await.ack(id).await.unwrap(); + } + } + + async fn fail(&mut self, _stream_id: u64, _pull_id: u64) { + // FIXME: we should allow returning errors + self.wal.lock().await.revert().await.unwrap() } fn is_transactional(&self) -> bool { @@ -140,6 +150,7 @@ impl Connector for Wal { builder: SourceManagerBuilder, ) -> Result> { let s = WalSource { + pull_id_map: HashMap::new(), wal: self.wal.clone(), origin_uri: self.event_origin_uri.clone(), };