Skip to content

Commit

Permalink
Update wal to cache it's own ids
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Nov 1, 2021
1 parent c832fce commit 0af3b59
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions src/connectors/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

#![cfg(not(tarpaulin_include))]
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use crate::connectors::prelude::*;
use async_std::sync::Mutex;
Expand Down Expand Up @@ -69,6 +69,7 @@ impl ConnectorBuilder for Builder {
struct WalSource {
origin_uri: EventOriginUri,
wal: Arc<Mutex<qwal::Wal>>,
pull_id_map: HashMap<u64, u64>, // FIXME: this is terrible :(
}

struct Payload(Event);
Expand All @@ -87,25 +88,34 @@ impl qwal::Entry for Payload {

#[async_trait::async_trait]
impl Source for WalSource {
async fn pull_data(&mut self, _pull_id: u64, _ctx: &SourceContext) -> Result<SourceReply> {
async fn pull_data(&mut self, pull_id: u64, _ctx: &SourceContext) -> Result<SourceReply> {
if let Some((id, event)) = self.wal.lock().await.pop::<Payload>().await? {
// FIXME: this is a dirty hack untill we can define the pull_id
self.pull_id_map.insert(pull_id, id);
Ok(SourceReply::Structured {
origin_uri: self.origin_uri.clone(),
payload: event.data,
stream: id, // FIXME: this is a dirty hack untill we can define the pull_id
stream: 0,
port: None,
})
} else {
Ok(SourceReply::Empty(DEFAULT_POLL_INTERVAL))
}
}

async fn ack(&mut self, stream_id: u64, _pull_id: u64) {
async fn ack(&mut self, _stream_id: u64, pull_id: u64) {
// FIXME: this is a dirty hack until we can define the pull_id for a connector
// FIXME: we should allow returning errors
// FIXME: why is this never being called
dbg!(stream_id);
self.wal.lock().await.ack(stream_id).await.unwrap()
if let Some(id) = self.pull_id_map.remove(&pull_id) {
dbg!(id);
self.wal.lock().await.ack(id).await.unwrap();
}
}

async fn fail(&mut self, _stream_id: u64, _pull_id: u64) {
// FIXME: we should allow returning errors
self.wal.lock().await.revert().await.unwrap()
}

fn is_transactional(&self) -> bool {
Expand Down Expand Up @@ -140,6 +150,7 @@ impl Connector for Wal {
builder: SourceManagerBuilder,
) -> Result<Option<SourceAddr>> {
let s = WalSource {
pull_id_map: HashMap::new(),
wal: self.wal.clone(),
origin_uri: self.event_origin_uri.clone(),
};
Expand Down

0 comments on commit 0af3b59

Please sign in to comment.